HDFS-15809. DeadNodeDetector does not remove live nodes from dead node set. Contributed by Jinglun.

This commit is contained in:
sunlisheng 2021-03-15 11:34:13 +08:00
parent b1dc6c40a0
commit 7025f39944
4 changed files with 109 additions and 81 deletions

View File

@ -29,9 +29,9 @@
import java.util.HashSet;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.Deque;
import java.util.LinkedList;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
@ -40,8 +40,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_DEFAULT;
@ -54,9 +52,9 @@
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_THREADS_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_RPC_THREADS_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_RPC_THREADS_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_IDLE_SLEEP_MS_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_IDLE_SLEEP_MS_DEFAULT;
/**
* Detect the dead nodes in advance, and share this information among all the
@ -74,7 +72,7 @@ public class DeadNodeDetector extends Daemon {
/**
* Waiting time when DeadNodeDetector's state is idle.
*/
private static final long IDLE_SLEEP_MS = 10000;
private final long idleSleepMs;
/**
* Client context name.
@ -113,16 +111,6 @@ public class DeadNodeDetector extends Daemon {
*/
private long suspectNodeDetectInterval = 0;
/**
* The max queue size of probing dead node.
*/
private int maxDeadNodesProbeQueueLen = 0;
/**
* The max queue size of probing suspect node.
*/
private int maxSuspectNodesProbeQueueLen;
/**
* Connection timeout for probing dead node in milliseconds.
*/
@ -131,12 +119,12 @@ public class DeadNodeDetector extends Daemon {
/**
* The dead node probe queue.
*/
private Queue<DatanodeInfo> deadNodesProbeQueue;
private UniqueQueue<DatanodeInfo> deadNodesProbeQueue;
/**
* The suspect node probe queue.
*/
private Queue<DatanodeInfo> suspectNodesProbeQueue;
private UniqueQueue<DatanodeInfo> suspectNodesProbeQueue;
/**
* The thread pool of probing dead node.
@ -181,6 +169,32 @@ private enum State {
INIT, CHECK_DEAD, IDLE, ERROR
}
/**
* The thread safe unique queue.
*/
static class UniqueQueue<T> {
private Deque<T> queue = new LinkedList<>();
private Set<T> set = new HashSet<>();
synchronized boolean offer(T dn) {
if (set.add(dn)) {
queue.addLast(dn);
return true;
}
return false;
}
synchronized T poll() {
T dn = queue.pollFirst();
set.remove(dn);
return dn;
}
synchronized int size() {
return set.size();
}
}
/**
* Disabled start probe suspect/dead thread for the testing.
*/
@ -203,20 +217,14 @@ public DeadNodeDetector(String name, Configuration conf) {
DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_INTERVAL_MS_DEFAULT);
socketTimeout =
conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, HdfsConstants.READ_TIMEOUT);
maxDeadNodesProbeQueueLen =
conf.getInt(DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_KEY,
DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_DEFAULT);
maxSuspectNodesProbeQueueLen =
conf.getInt(DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_KEY,
DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_DEFAULT);
probeConnectionTimeoutMs = conf.getLong(
DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_KEY,
DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_DEFAULT);
this.deadNodesProbeQueue = new UniqueQueue<>();
this.suspectNodesProbeQueue = new UniqueQueue<>();
this.deadNodesProbeQueue =
new ArrayBlockingQueue<DatanodeInfo>(maxDeadNodesProbeQueueLen);
this.suspectNodesProbeQueue =
new ArrayBlockingQueue<DatanodeInfo>(maxSuspectNodesProbeQueueLen);
idleSleepMs = conf.getLong(DFS_CLIENT_DEAD_NODE_DETECTION_IDLE_SLEEP_MS_KEY,
DFS_CLIENT_DEAD_NODE_DETECTION_IDLE_SLEEP_MS_DEFAULT);
int deadNodeDetectDeadThreads =
conf.getInt(DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_THREADS_KEY,
@ -447,8 +455,7 @@ private void checkDeadNodes() {
for (DatanodeInfo datanodeInfo : datanodeInfos) {
if (!deadNodesProbeQueue.offer(datanodeInfo)) {
LOG.debug("Skip to add dead node {} to check " +
"since the probe queue is full.", datanodeInfo);
break;
"since the node is already in the probe queue.", datanodeInfo);
} else {
LOG.debug("Add dead node to check: {}.", datanodeInfo);
}
@ -458,7 +465,7 @@ private void checkDeadNodes() {
private void idle() {
try {
Thread.sleep(IDLE_SLEEP_MS);
Thread.sleep(idleSleepMs);
} catch (InterruptedException e) {
LOG.debug("Got interrupted while DeadNodeDetector is idle.", e);
Thread.currentThread().interrupt();
@ -483,14 +490,24 @@ private void removeFromDead(DatanodeInfo datanodeInfo) {
deadNodes.remove(datanodeInfo.getDatanodeUuid());
}
public Queue<DatanodeInfo> getDeadNodesProbeQueue() {
public UniqueQueue<DatanodeInfo> getDeadNodesProbeQueue() {
return deadNodesProbeQueue;
}
public Queue<DatanodeInfo> getSuspectNodesProbeQueue() {
public UniqueQueue<DatanodeInfo> getSuspectNodesProbeQueue() {
return suspectNodesProbeQueue;
}
@VisibleForTesting
void setSuspectQueue(UniqueQueue<DatanodeInfo> queue) {
this.suspectNodesProbeQueue = queue;
}
@VisibleForTesting
void setDeadQueue(UniqueQueue<DatanodeInfo> queue) {
this.deadNodesProbeQueue = queue;
}
/**
* Add datanode to suspectNodes and suspectAndDeadNodes.
*/

View File

@ -164,13 +164,9 @@ public interface HdfsClientConfigKeys {
"dfs.client.deadnode.detection.enabled";
boolean DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_DEFAULT = false;
String DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_KEY =
"dfs.client.deadnode.detection.deadnode.queue.max";
int DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_DEFAULT = 100;
String DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_KEY =
"dfs.client.deadnode.detection.suspectnode.queue.max";
int DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_DEFAULT = 1000;
String DFS_CLIENT_DEAD_NODE_DETECTION_IDLE_SLEEP_MS_KEY =
"dfs.client.deadnode.detection.idle.sleep.ms";
long DFS_CLIENT_DEAD_NODE_DETECTION_IDLE_SLEEP_MS_DEFAULT = 10000;
String DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_KEY =
"dfs.client.deadnode.detection.probe.connection.timeout.ms";

View File

@ -3190,22 +3190,6 @@
</description>
</property>
<property>
<name>dfs.client.deadnode.detection.deadnode.queue.max</name>
<value>100</value>
<description>
The max queue size of probing dead node.
</description>
</property>
<property>
<name>dfs.client.deadnode.detection.suspectnode.queue.max</name>
<value>1000</value>
<description>
The max queue size of probing suspect node.
</description>
</property>
<property>
<name>dfs.client.deadnode.detection.probe.deadnode.threads</name>
<value>10</value>
@ -3214,6 +3198,14 @@
</description>
</property>
<property>
<name>dfs.client.deadnode.detection.idle.sleep.ms</name>
<value>10000</value>
<description>
The sleep time of DeadNodeDetector per iteration.
</description>
</property>
<property>
<name>dfs.client.deadnode.detection.probe.suspectnode.threads</name>
<value>10</value>

View File

@ -30,19 +30,20 @@
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import java.io.IOException;
import java.util.Collection;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CONTEXT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_INTERVAL_MS_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_IDLE_SLEEP_MS_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertNotSame;
@ -73,6 +74,7 @@ public void setUp() {
DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_KEY,
1000);
conf.setInt(DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY, 0);
conf.setLong(DFS_CLIENT_DEAD_NODE_DETECTION_IDLE_SLEEP_MS_KEY, 100);
}
@After
@ -247,13 +249,15 @@ public void testDeadNodeDetectionDeadNodeRecovery() throws Exception {
}
@Test
public void testDeadNodeDetectionMaxDeadNodesProbeQueue() throws Exception {
conf.setInt(DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_KEY, 1);
public void testDeadNodeDetectionDeadNodeProbe() throws Exception {
FileSystem fs = null;
FSDataInputStream in = null;
Path filePath = new Path("/" + GenericTestUtils.getMethodName());
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
cluster.waitActive();
FileSystem fs = cluster.getFileSystem();
Path filePath = new Path("/testDeadNodeDetectionMaxDeadNodesProbeQueue");
fs = cluster.getFileSystem();
createFile(fs, filePath);
// Remove three DNs,
@ -261,28 +265,47 @@ public void testDeadNodeDetectionMaxDeadNodesProbeQueue() throws Exception {
cluster.stopDataNode(0);
cluster.stopDataNode(0);
FSDataInputStream in = fs.open(filePath);
in = fs.open(filePath);
DFSInputStream din = (DFSInputStream) in.getWrappedStream();
DFSClient dfsClient = din.getDFSClient();
try {
DeadNodeDetector deadNodeDetector =
dfsClient.getClientContext().getDeadNodeDetector();
// Spy suspect queue and dead queue.
DeadNodeDetector.UniqueQueue<DatanodeInfo> queue =
deadNodeDetector.getSuspectNodesProbeQueue();
DeadNodeDetector.UniqueQueue<DatanodeInfo> suspectSpy =
Mockito.spy(queue);
deadNodeDetector.setSuspectQueue(suspectSpy);
queue = deadNodeDetector.getDeadNodesProbeQueue();
DeadNodeDetector.UniqueQueue<DatanodeInfo> deadSpy = Mockito.spy(queue);
deadNodeDetector.setDeadQueue(deadSpy);
// Trigger dead node detection.
try {
in.read();
} catch (BlockMissingException e) {
}
Thread.sleep(1500);
Assert.assertTrue((dfsClient.getClientContext().getDeadNodeDetector()
.getDeadNodesProbeQueue().size()
+ dfsClient.getDeadNodes(din).size()) <= 4);
Collection<DatanodeInfo> deadNodes =
dfsClient.getDeadNodeDetector().clearAndGetDetectedDeadNodes();
assertEquals(3, deadNodes.size());
for (DatanodeInfo dead : deadNodes) {
// Each node is suspected once then marked as dead.
Mockito.verify(suspectSpy, Mockito.times(1)).offer(dead);
// All the dead nodes should be scheduled and probed at least once.
Mockito.verify(deadSpy, Mockito.atLeastOnce()).offer(dead);
Mockito.verify(deadSpy, Mockito.atLeastOnce()).poll();
}
} finally {
if (in != null) {
in.close();
}
deleteFile(fs, filePath);
}
}
@Test
public void testDeadNodeDetectionSuspectNode() throws Exception {
conf.setInt(DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_KEY, 1);
DeadNodeDetector.setDisabledProbeThreadForTest(true);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();