From eb96a3093ea34a7749410a63c72b6d0a9636d80f Mon Sep 17 00:00:00 2001 From: Ayush Saxena Date: Wed, 25 Sep 2019 01:16:30 +0530 Subject: [PATCH] HDFS-14655. [SBN Read] Namenode crashes if one of The JN is down. Contributed by Ayush Saxena. --- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 5 ++- .../qjournal/client/IPCLoggerChannel.java | 14 +++++--- .../hdfs/qjournal/client/QuorumCall.java | 18 ++++++++++ .../qjournal/client/QuorumJournalManager.java | 2 ++ .../src/main/resources/hdfs-default.xml | 8 +++++ .../hdfs/qjournal/MiniJournalCluster.java | 6 +++- .../client/TestQuorumJournalManager.java | 34 +++++++++++++++++-- 7 files changed, 78 insertions(+), 9 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 0826cef837..462c81d2e8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -1171,6 +1171,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_QJOURNAL_WRITE_TXNS_TIMEOUT_KEY = "dfs.qjournal.write-txns.timeout.ms"; public static final String DFS_QJOURNAL_HTTP_OPEN_TIMEOUT_KEY = "dfs.qjournal.http.open.timeout.ms"; public static final String DFS_QJOURNAL_HTTP_READ_TIMEOUT_KEY = "dfs.qjournal.http.read.timeout.ms"; + public static final String DFS_QJOURNAL_PARALLEL_READ_NUM_THREADS_KEY = + "dfs.qjournal.parallel-read.num-threads"; public static final int DFS_QJOURNAL_START_SEGMENT_TIMEOUT_DEFAULT = 20000; public static final int DFS_QJOURNAL_PREPARE_RECOVERY_TIMEOUT_DEFAULT = 120000; public static final int DFS_QJOURNAL_ACCEPT_RECOVERY_TIMEOUT_DEFAULT = 120000; @@ -1181,7 +1183,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final int DFS_QJOURNAL_WRITE_TXNS_TIMEOUT_DEFAULT = 20000; public static final int DFS_QJOURNAL_HTTP_OPEN_TIMEOUT_DEFAULT = URLConnectionFactory.DEFAULT_SOCKET_TIMEOUT; public static final int DFS_QJOURNAL_HTTP_READ_TIMEOUT_DEFAULT = URLConnectionFactory.DEFAULT_SOCKET_TIMEOUT; - + public static final int DFS_QJOURNAL_PARALLEL_READ_NUM_THREADS_DEFAULT = 5; + public static final String DFS_MAX_NUM_BLOCKS_TO_LOG_KEY = "dfs.namenode.max-num-blocks-to-log"; public static final long DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT = 1000l; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java index 3a882e5e61..d5ec5ac7ab 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java @@ -27,6 +27,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import org.apache.hadoop.classification.InterfaceAudience; @@ -54,6 +55,7 @@ import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.util.StopWatch; +import org.apache.hadoop.util.concurrent.HadoopThreadPoolExecutor; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -270,12 +272,14 @@ public class IPCLoggerChannel implements AsyncLogger { */ @VisibleForTesting protected ExecutorService createParallelExecutor() { - return Executors.newCachedThreadPool( - new ThreadFactoryBuilder() - .setDaemon(true) + int numThreads = + conf.getInt(DFSConfigKeys.DFS_QJOURNAL_PARALLEL_READ_NUM_THREADS_KEY, + DFSConfigKeys.DFS_QJOURNAL_PARALLEL_READ_NUM_THREADS_DEFAULT); + return new HadoopThreadPoolExecutor(1, numThreads, 60L, + TimeUnit.SECONDS, new LinkedBlockingQueue<>(), + new ThreadFactoryBuilder().setDaemon(true) .setNameFormat("Logger channel (from parallel executor) to " + addr) - .setUncaughtExceptionHandler( - UncaughtExceptionHandlers.systemExit()) + .setUncaughtExceptionHandler(UncaughtExceptionHandlers.systemExit()) .build()); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumCall.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumCall.java index ef32eb11c3..49d999335d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumCall.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumCall.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hdfs.qjournal.client; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.TimeoutException; @@ -64,6 +66,7 @@ class QuorumCall { private static final float WAIT_PROGRESS_WARN_THRESHOLD = 0.7f; private final StopWatch quorumStopWatch; private final Timer timer; + private final List> allCalls; static QuorumCall create( Map> calls, Timer timer) { @@ -71,6 +74,7 @@ class QuorumCall { for (final Entry> e : calls.entrySet()) { Preconditions.checkArgument(e.getValue() != null, "null future for key: " + e.getKey()); + qr.addCall(e.getValue()); Futures.addCallback(e.getValue(), new FutureCallback() { @Override public void onFailure(Throwable t) { @@ -102,6 +106,11 @@ class QuorumCall { // Only instantiated from factory method above this.timer = timer; this.quorumStopWatch = new StopWatch(timer); + this.allCalls = new ArrayList<>(); + } + + private void addCall(ListenableFuture call) { + allCalls.add(call); } /** @@ -211,6 +220,15 @@ class QuorumCall { } } + /** + * Cancel any outstanding calls. + */ + void cancelCalls() { + for (ListenableFuture call : allCalls) { + call.cancel(true); + } + } + /** * Check if any of the responses came back with an AssertionError. * If so, it re-throws it, even if there was a quorum of responses. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java index 674ca704bc..abc2d4c1f9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java @@ -579,6 +579,8 @@ public class QuorumJournalManager implements JournalManager { LOG.debug(msg.toString()); } } + // Cancel any outstanding calls to JN's. + q.cancelCalls(); int maxAllowedTxns = !onlyDurableTxns ? highestTxnCount : responseCounts.get(responseCounts.size() - loggers.getMajoritySize()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index d34e4cd561..bb0a478438 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -4999,6 +4999,14 @@ + + dfs.qjournal.parallel-read.num-threads + 5 + + Number of threads per JN to be used for tailing edits. + + + dfs.quota.by.storage.type.enabled true diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniJournalCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniJournalCluster.java index b81b710c00..e3e862f34a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniJournalCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniJournalCluster.java @@ -196,7 +196,11 @@ public class MiniJournalCluster { public JournalNode getJournalNode(int i) { return nodes[i].node; } - + + public String getJournalNodeIpcAddress(int i) { + return nodes[i].ipcAddr.toString(); + } + public void restartJournalNode(int i) throws InterruptedException, IOException { JNInfo info = nodes[i]; JournalNode jn = info.node; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java index f3bb954dab..cd0216e2f2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java @@ -49,6 +49,7 @@ import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster; import org.apache.hadoop.hdfs.qjournal.QJMTestUtil; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto; import org.apache.hadoop.hdfs.qjournal.server.JournalFaultInjector; +import org.apache.hadoop.hdfs.qjournal.server.JournalNode; import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream; import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream; import org.apache.hadoop.hdfs.server.namenode.FileJournalManager; @@ -62,7 +63,9 @@ import org.apache.hadoop.test.GenericTestUtils; import org.apache.log4j.Level; import org.junit.After; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TestName; import org.mockito.Mockito; import org.mockito.stubbing.Stubber; @@ -87,11 +90,17 @@ public class TestQuorumJournalManager { GenericTestUtils.setLogLevel(ProtobufRpcEngine.LOG, Level.ALL); } + @Rule + public TestName name = new TestName(); + @Before public void setup() throws Exception { conf = new Configuration(); - // Don't retry connections - it just slows down the tests. - conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0); + if (!name.getMethodName().equals("testSelectThreadCounts")) { + // Don't retry connections - it just slows down the tests. + conf.setInt( + CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0); + } // Turn off IPC client caching to handle daemon restarts. conf.setInt( CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY, 0); @@ -1039,6 +1048,27 @@ public class TestQuorumJournalManager { } } + @Test + public void testSelectThreadCounts() throws Exception { + EditLogOutputStream stm = + qjm.startLogSegment(1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); + writeTxns(stm, 1, 10); + JournalNode jn0 = cluster.getJournalNode(0); + String ipcAddr = cluster.getJournalNodeIpcAddress(0); + jn0.stopAndJoin(0); + for (int i = 0; i < 1000; i++) { + qjm.selectInputStreams(new ArrayList<>(), 1, true, false); + } + String expectedName = + "Logger channel (from parallel executor) to " + ipcAddr; + long num = Thread.getAllStackTraces().keySet().stream() + .filter((t) -> t.getName().contains(expectedName)).count(); + // The number of threads for the stopped jn shouldn't be more than the + // configured value. + assertTrue("Number of threads are : " + num, + num <= DFSConfigKeys.DFS_QJOURNAL_PARALLEL_READ_NUM_THREADS_DEFAULT); + } + @Test public void testSelectViaRpcTwoJNsError() throws Exception { EditLogOutputStream stm =