From 97b797c314fcc6fe389a7e5635a19b4fc0895d6b Mon Sep 17 00:00:00 2001 From: Inigo Goiri Date: Fri, 28 Feb 2020 18:47:22 -0800 Subject: [PATCH] HDFS-15149. TestDeadNodeDetection test cases time-out. Contributed by Lisheng Sun. --- .../org/apache/hadoop/hdfs/ClientContext.java | 2 +- .../org/apache/hadoop/hdfs/DFSClient.java | 19 +++- .../apache/hadoop/hdfs/DeadNodeDetector.java | 16 ++- .../hadoop/hdfs/TestDeadNodeDetection.java | 98 ++++++++++++++----- 4 files changed, 102 insertions(+), 33 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java index 6ee5277bd2..cbd941b6b9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java @@ -293,7 +293,7 @@ public void stopDeadNodeDetectorThread() { if (deadNodeDetectorThr != null) { deadNodeDetectorThr.interrupt(); try { - deadNodeDetectorThr.join(3000); + deadNodeDetectorThr.join(); } catch (InterruptedException e) { LOG.warn("Encountered exception while waiting to join on dead " + "node detector thread.", e); diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 0309cbd229..0508e93128 100755 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -247,6 +247,20 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, private final int smallBufferSize; private final long serverDefaultsValidityPeriod; + /** + * Disabled stop DeadNodeDetectorThread for the testing when MiniDFSCluster + * start. + */ + private static volatile boolean disabledStopDeadNodeDetectorThreadForTest = + false; + + @VisibleForTesting + public static void setDisabledStopDeadNodeDetectorThreadForTest( + boolean disabledStopDeadNodeDetectorThreadForTest) { + DFSClient.disabledStopDeadNodeDetectorThreadForTest = + disabledStopDeadNodeDetectorThreadForTest; + } + public DfsClientConf getConf() { return dfsClientConf; } @@ -637,7 +651,10 @@ public synchronized void close() throws IOException { closeAllFilesBeingWritten(false); clientRunning = false; // close dead node detector thread - clientContext.stopDeadNodeDetectorThread(); + if (!disabledStopDeadNodeDetectorThreadForTest) { + clientContext.stopDeadNodeDetectorThread(); + } + // close connections to the namenode closeConnectionToNamenode(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DeadNodeDetector.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DeadNodeDetector.java index 75a91ba718..a573e8a22a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DeadNodeDetector.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DeadNodeDetector.java @@ -243,7 +243,7 @@ public DeadNodeDetector(String name, Configuration conf) { @Override public void run() { - while (true) { + while (!Thread.currentThread().isInterrupted()) { clearAndGetDetectedDeadNodes(); LOG.debug("Current detector state {}, the detected nodes: {}.", state, deadNodes.values()); @@ -261,6 +261,8 @@ public void run() { try { Thread.sleep(ERROR_SLEEP_MS); } catch (InterruptedException e) { + LOG.debug("Got interrupted while DeadNodeDetector is error.", e); + Thread.currentThread().interrupt(); } return; default: @@ -270,8 +272,9 @@ public void run() { } @VisibleForTesting - static void disabledProbeThreadForTest() { - disabledProbeThreadForTest = true; + static void setDisabledProbeThreadForTest( + boolean disabledProbeThreadForTest) { + DeadNodeDetector.disabledProbeThreadForTest = disabledProbeThreadForTest; } /** @@ -426,7 +429,8 @@ private void idle() { try { Thread.sleep(IDLE_SLEEP_MS); } catch (InterruptedException e) { - + LOG.debug("Got interrupted while DeadNodeDetector is idle.", e); + Thread.currentThread().interrupt(); } state = State.CHECK_DEAD; @@ -548,7 +552,9 @@ private static void probeSleep(long time) { try { Thread.sleep(time); } catch (InterruptedException e) { + LOG.debug("Got interrupted while probe is scheduling.", e); Thread.currentThread().interrupt(); + return; } } @@ -566,7 +572,7 @@ static class ProbeScheduler implements Runnable { @Override public void run() { - while (true) { + while (!Thread.currentThread().isInterrupted()) { deadNodeDetector.scheduleProbe(type); if (type == ProbeType.CHECK_SUSPECT) { probeSleep(deadNodeDetector.suspectNodeDetectInterval); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDeadNodeDetection.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDeadNodeDetection.java index a1e53cdb52..a571e46ce8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDeadNodeDetection.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDeadNodeDetection.java @@ -31,12 +31,17 @@ import org.junit.Test; import java.io.IOException; +import java.util.Queue; +import java.util.concurrent.LinkedBlockingQueue; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CONTEXT; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_KEY; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_INTERVAL_MS_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_KEY; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY; import static org.junit.Assert.assertEquals; /** @@ -53,9 +58,15 @@ public void setUp() { conf = new HdfsConfiguration(); conf.setBoolean(DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_KEY, true); conf.setLong( - DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_KEY, 1000); + DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_KEY, + 1000); conf.setLong( - DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_INTERVAL_MS_KEY, 100); + DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_INTERVAL_MS_KEY, + 100); + conf.setLong( + DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_KEY, + 1000); + conf.setInt(DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY, 0); } @After @@ -67,6 +78,7 @@ public void tearDown() { @Test public void testDeadNodeDetectionInBackground() throws Exception { + conf.set(DFS_CLIENT_CONTEXT, "testDeadNodeDetectionInBackground"); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); cluster.waitActive(); @@ -102,7 +114,10 @@ public void testDeadNodeDetectionInBackground() throws Exception { } catch (BlockMissingException e) { } - waitForDeadNode(dfsClient, 3); + DefaultCoordination defaultCoordination = new DefaultCoordination(); + defaultCoordination.startWaitForDeadNodeThread(dfsClient, 3); + defaultCoordination.sync(); + assertEquals(3, dfsClient.getDeadNodes(din).size()); assertEquals(3, dfsClient.getClientContext().getDeadNodeDetector() .clearAndGetDetectedDeadNodes().size()); @@ -143,6 +158,10 @@ public void testDeadNodeDetectionInMultipleDFSInputStream() din2 = (DFSInputStream) in2.getWrappedStream(); dfsClient2 = din2.getDFSClient(); + + DefaultCoordination defaultCoordination = new DefaultCoordination(); + defaultCoordination.startWaitForDeadNodeThread(dfsClient2, 1); + defaultCoordination.sync(); assertEquals(dfsClient1.toString(), dfsClient2.toString()); assertEquals(1, dfsClient1.getDeadNodes(din1).size()); assertEquals(1, dfsClient2.getDeadNodes(din2).size()); @@ -173,9 +192,13 @@ public void testDeadNodeDetectionInMultipleDFSInputStream() @Test public void testDeadNodeDetectionDeadNodeRecovery() throws Exception { + // prevent interrupt deadNodeDetectorThr in cluster.waitActive() + DFSClient.setDisabledStopDeadNodeDetectorThreadForTest(true); + conf.set(DFS_CLIENT_CONTEXT, "testDeadNodeDetectionDeadNodeRecovery"); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); cluster.waitActive(); + DFSClient.setDisabledStopDeadNodeDetectorThreadForTest(false); FileSystem fs = cluster.getFileSystem(); Path filePath = new Path("/testDeadNodeDetectionDeadNodeRecovery"); createFile(fs, filePath); @@ -193,14 +216,18 @@ public void testDeadNodeDetectionDeadNodeRecovery() throws Exception { in.read(); } catch (BlockMissingException e) { } - - waitForDeadNode(dfsClient, 3); + DefaultCoordination defaultCoordination = new DefaultCoordination(); + defaultCoordination.startWaitForDeadNodeThread(dfsClient, 3); + defaultCoordination.sync(); assertEquals(3, dfsClient.getDeadNodes(din).size()); assertEquals(3, dfsClient.getClientContext().getDeadNodeDetector() .clearAndGetDetectedDeadNodes().size()); cluster.restartDataNode(one, true); - waitForDeadNode(dfsClient, 2); + + defaultCoordination = new DefaultCoordination(); + defaultCoordination.startWaitForDeadNodeThread(dfsClient, 2); + defaultCoordination.sync(); assertEquals(2, dfsClient.getDeadNodes(din).size()); assertEquals(2, dfsClient.getClientContext().getDeadNodeDetector() .clearAndGetDetectedDeadNodes().size()); @@ -250,7 +277,7 @@ public void testDeadNodeDetectionMaxDeadNodesProbeQueue() throws Exception { @Test public void testDeadNodeDetectionSuspectNode() throws Exception { conf.setInt(DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_KEY, 1); - DeadNodeDetector.disabledProbeThreadForTest(); + DeadNodeDetector.setDisabledProbeThreadForTest(true); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); cluster.waitActive(); @@ -288,6 +315,8 @@ public void testDeadNodeDetectionSuspectNode() throws Exception { assertEquals(0, dfsClient.getDeadNodes(din).size()); assertEquals(0, dfsClient.getClientContext().getDeadNodeDetector() .clearAndGetDetectedDeadNodes().size()); + // reset disabledProbeThreadForTest + DeadNodeDetector.setDisabledProbeThreadForTest(false); } } @@ -317,24 +346,6 @@ private void deleteFile(FileSystem fs, Path filePath) throws IOException { fs.delete(filePath, true); } - private void waitForDeadNode(DFSClient dfsClient, int size) throws Exception { - GenericTestUtils.waitFor(new Supplier() { - @Override - public Boolean get() { - try { - if (dfsClient.getClientContext().getDeadNodeDetector() - .clearAndGetDetectedDeadNodes().size() == size) { - return true; - } - } catch (Exception e) { - // Ignore the exception - } - - return false; - } - }, 5000, 100000); - } - private void waitForSuspectNode(DFSClient dfsClient) throws Exception { GenericTestUtils.waitFor(new Supplier() { @Override @@ -350,6 +361,41 @@ public Boolean get() { return false; } - }, 5000, 100000); + }, 500, 5000); + } + + class DefaultCoordination { + private Queue queue = new LinkedBlockingQueue(1); + + public boolean addToQueue() { + return queue.offer(new Object()); + } + + public Object removeFromQueue() { + return queue.poll(); + } + + public void sync() { + while (removeFromQueue() == null) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + } + } + } + + private void startWaitForDeadNodeThread(DFSClient dfsClient, int size) { + new Thread(() -> { + DeadNodeDetector deadNodeDetector = + dfsClient.getClientContext().getDeadNodeDetector(); + while (deadNodeDetector.clearAndGetDetectedDeadNodes().size() != size) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + } + } + addToQueue(); + }).start(); + } } }