From 00e99c65943e64fd696ec715cf21e851b93115f1 Mon Sep 17 00:00:00 2001 From: Erik Krogen Date: Tue, 22 May 2018 16:45:26 -0700 Subject: [PATCH] HDFS-13609. [SBN read] Edit Tail Fast Path Part 3: NameNode-side changes to support tailing edits via RPC. Contributed by Erik Krogen. --- .../hdfs/qjournal/client/AsyncLogger.java | 7 + .../hdfs/qjournal/client/AsyncLoggerSet.java | 14 ++ .../qjournal/client/IPCLoggerChannel.java | 14 ++ .../qjournal/client/QuorumJournalManager.java | 111 ++++++++++++++- .../namenode/EditLogFileInputStream.java | 44 ++++++ .../server/namenode/ha/EditLogTailer.java | 6 +- .../src/main/resources/hdfs-default.xml | 4 +- .../client/TestQuorumJournalManager.java | 130 ++++++++++++++++++ .../client/TestQuorumJournalManagerUnit.java | 101 +++++++++++++- .../namenode/TestEditLogFileInputStream.java | 18 +++ 10 files changed, 439 insertions(+), 10 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java index 2633723711..5eead67fa7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java @@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol; +import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto; @@ -107,6 +108,12 @@ public ListenableFuture finalizeLogSegment( * Begin a new epoch on the target node. */ public ListenableFuture newEpoch(long epoch); + + /** + * Fetch journaled edits from the cache. + */ + public ListenableFuture getJournaledEdits( + long fromTxnId, int maxTransactions); /** * Fetch the list of edit logs available on the remote node. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java index 6302b2ac30..f024b0e8c2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java @@ -26,6 +26,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto; +import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto; @@ -261,6 +262,19 @@ public QuorumCall sendEdits( return QuorumCall.create(calls); } + public QuorumCall + getJournaledEdits(long fromTxnId, int maxTransactions) { + Map> calls + = Maps.newHashMap(); + for (AsyncLogger logger : loggers) { + ListenableFuture future = + logger.getJournaledEdits(fromTxnId, maxTransactions); + calls.put(logger, future); + } + return QuorumCall.create(calls); + } + public QuorumCall getEditLogManifest( long fromTxnId, boolean inProgressOk) { Map getJournaledEdits( + long fromTxnId, int maxTransactions) { + return parallelExecutor.submit( + new Callable() { + @Override + public GetJournaledEditsResponseProto call() throws IOException { + return getProxy().getJournaledEdits(journalId, nameServiceId, + fromTxnId, maxTransactions); + } + }); + } + @Override public ListenableFuture getEditLogManifest( final long fromTxnId, final boolean inProgressOk) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java index ba2b20a7bb..80a6273426 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java @@ -21,6 +21,7 @@ import java.net.InetSocketAddress; import java.net.URI; import java.net.URL; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -36,6 +37,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto; @@ -67,6 +69,14 @@ public class QuorumJournalManager implements JournalManager { static final Logger LOG = LoggerFactory.getLogger(QuorumJournalManager.class); + // This config is not publicly exposed + static final String QJM_RPC_MAX_TXNS_KEY = + "dfs.ha.tail-edits.qjm.rpc.max-txns"; + static final int QJM_RPC_MAX_TXNS_DEFAULT = 5000; + + // Maximum number of transactions to fetch at a time when using the + // RPC edit fetch mechanism + private final int maxTxnsPerRpc; // Timeouts for which the QJM will wait for each of the following actions. private final int startSegmentTimeoutMs; private final int prepareRecoveryTimeoutMs; @@ -125,6 +135,10 @@ public QuorumJournalManager(Configuration conf, this.nameServiceId = nameServiceId; this.loggers = new AsyncLoggerSet(createLoggers(loggerFactory)); + this.maxTxnsPerRpc = + conf.getInt(QJM_RPC_MAX_TXNS_KEY, QJM_RPC_MAX_TXNS_DEFAULT); + Preconditions.checkArgument(maxTxnsPerRpc > 0, + "Must specify %s greater than 0!", QJM_RPC_MAX_TXNS_KEY); // Configure timeouts. this.startSegmentTimeoutMs = conf.getInt( DFSConfigKeys.DFS_QJOURNAL_START_SEGMENT_TIMEOUT_KEY, @@ -478,17 +492,104 @@ public void selectInputStreams(Collection streams, public void selectInputStreams(Collection streams, long fromTxnId, boolean inProgressOk, boolean onlyDurableTxns) throws IOException { + if (inProgressOk) { + LOG.info("Tailing edits starting from txn ID " + fromTxnId + + " via RPC mechanism"); + try { + Collection rpcStreams = new ArrayList<>(); + selectRpcInputStreams(rpcStreams, fromTxnId, onlyDurableTxns); + streams.addAll(rpcStreams); + return; + } catch (IOException ioe) { + LOG.warn("Encountered exception while tailing edits >= " + fromTxnId + + " via RPC; falling back to streaming.", ioe); + } + } + selectStreamingInputStreams(streams, fromTxnId, inProgressOk, + onlyDurableTxns); + } + /** + * Select input streams from the journals, specifically using the RPC + * mechanism optimized for low latency. + * + * @param streams The collection to store the return streams into. + * @param fromTxnId Select edits starting from this transaction ID + * @param onlyDurableTxns Iff true, only include transactions which have been + * committed to a quorum of the journals. + * @throws IOException Upon issues, including cache misses on the journals. + */ + private void selectRpcInputStreams(Collection streams, + long fromTxnId, boolean onlyDurableTxns) throws IOException { + QuorumCall q = + loggers.getJournaledEdits(fromTxnId, maxTxnsPerRpc); + Map responseMap = + loggers.waitForWriteQuorum(q, selectInputStreamsTimeoutMs, + "selectRpcInputStreams"); + assert responseMap.size() >= loggers.getMajoritySize() : + "Quorum call returned without a majority"; + + List responseCounts = new ArrayList<>(); + for (GetJournaledEditsResponseProto resp : responseMap.values()) { + responseCounts.add(resp.getTxnCount()); + } + Collections.sort(responseCounts); + int highestTxnCount = responseCounts.get(responseCounts.size() - 1); + if (LOG.isDebugEnabled() || highestTxnCount < 0) { + StringBuilder msg = new StringBuilder("Requested edits starting from "); + msg.append(fromTxnId).append("; got ").append(responseMap.size()) + .append(" responses: <"); + for (Map.Entry ent : + responseMap.entrySet()) { + msg.append("[").append(ent.getKey()).append(", ") + .append(ent.getValue().getTxnCount()).append("],"); + } + msg.append(">"); + if (highestTxnCount < 0) { + throw new IOException("Did not get any valid JournaledEdits " + + "responses: " + msg); + } else { + LOG.debug(msg.toString()); + } + } + + int maxAllowedTxns = !onlyDurableTxns ? highestTxnCount : + responseCounts.get(responseCounts.size() - loggers.getMajoritySize()); + if (maxAllowedTxns == 0) { + LOG.debug("No new edits available in logs; requested starting from " + + "ID " + fromTxnId); + return; + } + LOG.info("Selected loggers with >= " + maxAllowedTxns + + " transactions starting from " + fromTxnId); + PriorityQueue allStreams = new PriorityQueue<>( + JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR); + for (GetJournaledEditsResponseProto resp : responseMap.values()) { + long endTxnId = fromTxnId - 1 + + Math.min(maxAllowedTxns, resp.getTxnCount()); + allStreams.add(EditLogFileInputStream.fromByteString( + resp.getEditLog(), fromTxnId, endTxnId, true)); + } + JournalSet.chainAndMakeRedundantStreams(streams, allStreams, fromTxnId); + } + + /** + * Select input streams from the journals, specifically using the streaming + * mechanism optimized for resiliency / bulk load. + */ + private void selectStreamingInputStreams( + Collection streams, long fromTxnId, + boolean inProgressOk, boolean onlyDurableTxns) throws IOException { QuorumCall q = loggers.getEditLogManifest(fromTxnId, inProgressOk); Map resps = loggers.waitForWriteQuorum(q, selectInputStreamsTimeoutMs, - "selectInputStreams"); - - LOG.debug("selectInputStream manifests:\n" + + "selectStreamingInputStreams"); + + LOG.debug("selectStreamingInputStream manifests:\n" + Joiner.on("\n").withKeyValueSeparator(": ").join(resps)); - - final PriorityQueue allStreams = + + final PriorityQueue allStreams = new PriorityQueue(64, JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR); for (Map.Entry e : resps.entrySet()) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java index 95a305e735..7dd3d549fb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.server.namenode; +import com.google.protobuf.ByteString; import java.io.BufferedInputStream; import java.io.DataInputStream; import java.io.EOFException; @@ -119,6 +120,23 @@ public static EditLogInputStream fromUrl( return new EditLogFileInputStream(new URLLog(connectionFactory, url), startTxId, endTxId, inProgress); } + + /** + * Create an EditLogInputStream from a {@link ByteString}, i.e. an in-memory + * collection of bytes. + * + * @param bytes The byte string to read from + * @param startTxId the expected starting transaction ID + * @param endTxId the expected ending transaction ID + * @param inProgress whether the log is in-progress + * @return An edit stream to read from + */ + public static EditLogInputStream fromByteString(ByteString bytes, + long startTxId, long endTxId, boolean inProgress) { + return new EditLogFileInputStream(new ByteStringLog(bytes, + String.format("ByteStringEditLog[%d, %d]", startTxId, endTxId)), + startTxId, endTxId, inProgress); + } private EditLogFileInputStream(LogSource log, long firstTxId, long lastTxId, @@ -376,6 +394,32 @@ private interface LogSource { public long length(); public String getName(); } + + private static class ByteStringLog implements LogSource { + private final ByteString bytes; + private final String name; + + public ByteStringLog(ByteString bytes, String name) { + this.bytes = bytes; + this.name = name; + } + + @Override + public InputStream getInputStream() { + return bytes.newInput(); + } + + @Override + public long length() { + return bytes.size(); + } + + @Override + public String getName() { + return name; + } + + } private static class FileLog implements LogSource { private final File file; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java index fc5f3a345e..780a0f6718 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java @@ -145,7 +145,11 @@ public class EditLogTailer { private int maxRetries; /** - * Whether the tailer should tail the in-progress edit log segments. + * Whether the tailer should tail the in-progress edit log segments. If true, + * this will also attempt to optimize for latency when tailing the edit logs + * (if using the + * {@link org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager}, this + * implies using the RPC-based mechanism to tail edits). */ private final boolean inProgressOk; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 6b52b0bf59..7704cd5af0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -3193,7 +3193,9 @@ Whether enable standby namenode to tail in-progress edit logs. Clients might want to turn it on when they want Standby NN to have - more up-to-date data. + more up-to-date data. When using the QuorumJournalManager, this enables + tailing of edit logs via the RPC-based mechanism, rather than streaming, + which allows for much fresher data. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java index f7c3a27404..9f089c9b16 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java @@ -44,6 +44,7 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster; import org.apache.hadoop.hdfs.qjournal.QJMTestUtil; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto; @@ -91,6 +92,10 @@ public void setup() throws Exception { conf = new Configuration(); // Don't retry connections - it just slows down the tests. conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0); + // Turn off IPC client caching to handle daemon restarts. + conf.setInt( + CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY, 0); + conf.setBoolean(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, true); cluster = new MiniJournalCluster.Builder(conf) .baseDir(GenericTestUtils.getRandomizedTestDir().getAbsolutePath()) @@ -959,6 +964,131 @@ public void testInProgressRecovery() throws Exception { qjm2.selectInputStreams(streams, 1, true, true); verifyEdits(streams, 1, 8); } + + @Test + public void testSelectViaRpcWithDurableTransactions() throws Exception { + // Two loggers will have up to ID 5, one will have up to ID 6 + failLoggerAtTxn(spies.get(0), 6); + failLoggerAtTxn(spies.get(1), 6); + EditLogOutputStream stm = + qjm.startLogSegment(1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); + writeTxns(stm, 1, 5); + try { + writeTxns(stm, 6, 1); + fail("Did not fail to write when only a minority succeeded"); + } catch (QuorumException qe) { + GenericTestUtils.assertExceptionContains( + "too many exceptions to achieve quorum size 2/3", qe); + } + + List streams = new ArrayList<>(); + qjm.selectInputStreams(streams, 1, true, true); + verifyEdits(streams, 1, 5); + IOUtils.closeStreams(streams.toArray(new Closeable[0])); + for (AsyncLogger logger : spies) { + Mockito.verify(logger, Mockito.times(1)).getJournaledEdits(1, + QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT); + } + } + + @Test + public void testSelectViaRpcWithoutDurableTransactions() throws Exception { + setupLoggers345(); + futureThrows(new IOException()).when(spies.get(1)).getJournaledEdits(1, + QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT); + + List streams = new ArrayList<>(); + qjm.selectInputStreams(streams, 1, true, false); + verifyEdits(streams, 1, 5); + IOUtils.closeStreams(streams.toArray(new Closeable[0])); + for (AsyncLogger logger : spies) { + Mockito.verify(logger, Mockito.times(1)).getJournaledEdits(1, + QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT); + } + } + + @Test + public void testSelectViaRpcOneDeadJN() throws Exception { + EditLogOutputStream stm = + qjm.startLogSegment(1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); + writeTxns(stm, 1, 10); + + cluster.getJournalNode(0).stopAndJoin(0); + + List streams = new ArrayList<>(); + qjm.selectInputStreams(streams, 1, true, false); + verifyEdits(streams, 1, 10); + IOUtils.closeStreams(streams.toArray(new Closeable[0])); + } + + @Test + public void testSelectViaRpcTwoDeadJNs() throws Exception { + EditLogOutputStream stm = + qjm.startLogSegment(1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); + writeTxns(stm, 1, 10); + + cluster.getJournalNode(0).stopAndJoin(0); + cluster.getJournalNode(1).stopAndJoin(0); + + try { + qjm.selectInputStreams(new ArrayList<>(), 1, true, false); + fail(""); + } catch (QuorumException qe) { + GenericTestUtils.assertExceptionContains( + "too many exceptions to achieve quorum size 2/3", qe); + } + } + + @Test + public void testSelectViaRpcTwoJNsError() throws Exception { + EditLogOutputStream stm = + qjm.startLogSegment(1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); + writeTxns(stm, 1, 10); + writeTxns(stm, 11, 1); + + futureThrows(new IOException()).when(spies.get(0)).getJournaledEdits(1, + QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT); + futureThrows(new IOException()).when(spies.get(1)).getJournaledEdits(1, + QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT); + + List streams = new ArrayList<>(); + qjm.selectInputStreams(streams, 1, true, true); + // This should still succeed as the QJM should fall back to the streaming + // mechanism for fetching edits + verifyEdits(streams, 1, 11); + IOUtils.closeStreams(streams.toArray(new Closeable[0])); + + for (AsyncLogger logger : spies) { + Mockito.verify(logger, Mockito.times(1)).getEditLogManifest(1, true); + } + } + + @Test + public void testSelectViaRpcAfterJNRestart() throws Exception { + EditLogOutputStream stm = + qjm.startLogSegment(1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); + writeTxns(stm, 1, 10); + qjm.finalizeLogSegment(1, 10); + + // Close to avoid connections hanging around after the JNs are restarted + for (int i = 0; i < cluster.getNumNodes(); i++) { + cluster.restartJournalNode(i); + } + cluster.waitActive(); + + qjm = createSpyingQJM(); + spies = qjm.getLoggerSetForTests().getLoggersForTests(); + List streams = new ArrayList<>(); + qjm.selectInputStreams(streams, 1, true, true); + // This should still succeed as the QJM should fall back to the streaming + // mechanism for fetching edits + verifyEdits(streams, 1, 10); + IOUtils.closeStreams(streams.toArray(new Closeable[0])); + + for (AsyncLogger logger : spies) { + Mockito.verify(logger, Mockito.times(1)).getEditLogManifest(1, true); + } + } private QuorumJournalManager createSpyingQJM() throws IOException, URISyntaxException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java index ebd1b15b4e..30ef21b637 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java @@ -17,11 +17,14 @@ */ package org.apache.hadoop.hdfs.qjournal.client; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.eq; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; import java.io.IOException; import java.net.URI; import java.util.List; @@ -29,11 +32,11 @@ import org.junit.Assert; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.qjournal.client.AsyncLogger; -import org.apache.hadoop.hdfs.qjournal.client.QuorumException; -import org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager; +import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto; +import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream; +import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream; import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream; import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; @@ -45,11 +48,15 @@ import org.mockito.stubbing.Stubber; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; +import com.google.protobuf.ByteString; import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.writeOp; +import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.createTxnData; +import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.verifyEdits; /** * True unit tests for QuorumJournalManager @@ -217,6 +224,94 @@ public void testWriteEditsOneSlow() throws Exception { Mockito.verify(spyLoggers.get(0)).setCommittedTxId(1L); } + @Test + public void testReadRpcInputStreams() throws Exception { + for (int jn = 0; jn < 3; jn++) { + futureReturns(getJournaledEditsReponse(1, 3)) + .when(spyLoggers.get(jn)).getJournaledEdits(1, + QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT); + } + + List streams = Lists.newArrayList(); + qjm.selectInputStreams(streams, 1, true, true); + assertEquals(1, streams.size()); + verifyEdits(streams, 1, 3); + } + + @Test + public void testReadRpcMismatchedInputStreams() throws Exception { + for (int jn = 0; jn < 3; jn++) { + futureReturns(getJournaledEditsReponse(1, jn + 1)) + .when(spyLoggers.get(jn)).getJournaledEdits(1, + QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT); + } + + List streams = Lists.newArrayList(); + qjm.selectInputStreams(streams, 1, true, true); + assertEquals(1, streams.size()); + verifyEdits(streams, 1, 2); + } + + @Test + public void testReadRpcInputStreamsOneSlow() throws Exception { + for (int jn = 0; jn < 2; jn++) { + futureReturns(getJournaledEditsReponse(1, jn + 1)) + .when(spyLoggers.get(jn)).getJournaledEdits(1, + QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT); + } + Mockito.doReturn(SettableFuture.create()) + .when(spyLoggers.get(2)).getJournaledEdits(1, + QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT); + + List streams = Lists.newArrayList(); + qjm.selectInputStreams(streams, 1, true, true); + assertEquals(1, streams.size()); + verifyEdits(streams, 1, 1); + } + + @Test + public void testReadRpcInputStreamsOneException() throws Exception { + for (int jn = 0; jn < 2; jn++) { + futureReturns(getJournaledEditsReponse(1, jn + 1)) + .when(spyLoggers.get(jn)).getJournaledEdits(1, + QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT); + } + futureThrows(new IOException()).when(spyLoggers.get(2)) + .getJournaledEdits(1, QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT); + + List streams = Lists.newArrayList(); + qjm.selectInputStreams(streams, 1, true, true); + assertEquals(1, streams.size()); + verifyEdits(streams, 1, 1); + } + + @Test + public void testReadRpcInputStreamsNoNewEdits() throws Exception { + for (int jn = 0; jn < 3; jn++) { + futureReturns(GetJournaledEditsResponseProto.newBuilder() + .setTxnCount(0).setEditLog(ByteString.EMPTY).build()) + .when(spyLoggers.get(jn)) + .getJournaledEdits(1, QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT); + } + + List streams = Lists.newArrayList(); + qjm.selectInputStreams(streams, 1, true, true); + assertEquals(0, streams.size()); + } + + private GetJournaledEditsResponseProto getJournaledEditsReponse( + int startTxn, int numTxns) throws Exception { + ByteArrayOutputStream byteStream = new ByteArrayOutputStream(); + EditLogFileOutputStream.writeHeader( + NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION, + new DataOutputStream(byteStream)); + byteStream.write(createTxnData(startTxn, numTxns)); + return GetJournaledEditsResponseProto.newBuilder() + .setTxnCount(numTxns) + .setEditLog(ByteString.copyFrom(byteStream.toByteArray())) + .build(); + } + private EditLogOutputStream createLogSegment() throws IOException { futureReturns(null).when(spyLoggers.get(0)).startLogSegment(Mockito.anyLong(), Mockito.eq(NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION)); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileInputStream.java index 3eca80f386..3824b833e7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileInputStream.java @@ -32,6 +32,7 @@ import java.net.URL; import java.util.EnumMap; +import com.google.protobuf.ByteString; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -80,6 +81,23 @@ public void testReadURL() throws Exception { elis.close(); } + @Test + public void testByteStringLog() throws Exception { + ByteString bytes = ByteString.copyFrom(FAKE_LOG_DATA); + EditLogInputStream elis = EditLogFileInputStream.fromByteString(bytes, + HdfsServerConstants.INVALID_TXID, HdfsServerConstants.INVALID_TXID, + true); + // Read the edit log and verify that all of the data is present + EnumMap> counts = FSImageTestUtil + .countEditLogOpTypes(elis); + assertThat(counts.get(FSEditLogOpCodes.OP_ADD).held, is(1)); + assertThat(counts.get(FSEditLogOpCodes.OP_SET_GENSTAMP_V1).held, is(1)); + assertThat(counts.get(FSEditLogOpCodes.OP_CLOSE).held, is(1)); + + assertEquals(FAKE_LOG_DATA.length, elis.length()); + elis.close(); + } + /** * Regression test for HDFS-8965 which verifies that * FSEditLogFileInputStream#scanOp verifies Op checksums.