diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt index b052050856..63fd1b3bc6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt @@ -76,3 +76,5 @@ HDFS-3894. QJM: testRecoverAfterDoubleFailures can be flaky due to IPC client ca HDFS-3926. QJM: Add user documentation for QJM. (atm) HDFS-3943. QJM: remove currently-unused md5sum field (todd) + +HDFS-3950. QJM: misc TODO cleanup, improved log messages, etc. (todd) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 7e58ea1023..4aadecd5c5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -422,6 +422,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_QJOURNAL_SELECT_INPUT_STREAMS_TIMEOUT_KEY = "dfs.qjournal.select-input-streams.timeout.ms"; public static final String DFS_QJOURNAL_GET_JOURNAL_STATE_TIMEOUT_KEY = "dfs.qjournal.get-journal-state.timeout.ms"; public static final String DFS_QJOURNAL_NEW_EPOCH_TIMEOUT_KEY = "dfs.qjournal.new-epoch.timeout.ms"; + public static final String DFS_QJOURNAL_WRITE_TXNS_TIMEOUT_KEY = "dfs.qjournal.write-txns.timeout.ms"; public static final int DFS_QJOURNAL_START_SEGMENT_TIMEOUT_DEFAULT = 20000; public static final int DFS_QJOURNAL_PREPARE_RECOVERY_TIMEOUT_DEFAULT = 120000; public static final int DFS_QJOURNAL_ACCEPT_RECOVERY_TIMEOUT_DEFAULT = 120000; @@ -429,5 +430,6 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final int DFS_QJOURNAL_SELECT_INPUT_STREAMS_TIMEOUT_DEFAULT = 20000; public static final int DFS_QJOURNAL_GET_JOURNAL_STATE_TIMEOUT_DEFAULT = 120000; public static final int DFS_QJOURNAL_NEW_EPOCH_TIMEOUT_DEFAULT = 120000; + public static final int DFS_QJOURNAL_WRITE_TXNS_TIMEOUT_DEFAULT = 20000; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java index 40df91c00e..16cd548cb5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java @@ -25,24 +25,20 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hdfs.qjournal.protocol.JournalNotFormattedException; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest; -import org.apache.hadoop.ipc.RemoteException; import org.apache.jasper.compiler.JspUtil; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.Maps; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.SettableFuture; /** * Wrapper around a set of Loggers, taking care of fanning out @@ -171,6 +167,11 @@ int size() { return loggers.size(); } + @Override + public String toString() { + return "[" + Joiner.on(", ").join(loggers) + "]"; + } + /** * Append an HTML-formatted status readout on the current * state of the underlying loggers. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java index 2186d86ba5..1c82af858c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java @@ -54,6 +54,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; +import com.google.common.net.InetAddresses; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -554,7 +555,8 @@ public Void call() throws IOException { @Override public String toString() { - return "Channel to journal node " + addr; + return InetAddresses.toAddrString(addr.getAddress()) + ':' + + addr.getPort(); } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java index bfcc56f879..6b3503dbba 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java @@ -71,6 +71,7 @@ public class QuorumJournalManager implements JournalManager { private final int selectInputStreamsTimeoutMs; private final int getJournalStateTimeoutMs; private final int newEpochTimeoutMs; + private final int writeTxnsTimeoutMs; // Since these don't occur during normal operation, we can // use rather lengthy timeouts, and don't need to make them @@ -84,6 +85,8 @@ public class QuorumJournalManager implements JournalManager { private boolean isActiveWriter; private final AsyncLoggerSet loggers; + + private int outputBufferCapacity = 512 * 1024; public QuorumJournalManager(Configuration conf, URI uri, NamespaceInfo nsInfo) throws IOException { @@ -122,8 +125,9 @@ public QuorumJournalManager(Configuration conf, this.newEpochTimeoutMs = conf.getInt( DFSConfigKeys.DFS_QJOURNAL_NEW_EPOCH_TIMEOUT_KEY, DFSConfigKeys.DFS_QJOURNAL_NEW_EPOCH_TIMEOUT_DEFAULT); - - + this.writeTxnsTimeoutMs = conf.getInt( + DFSConfigKeys.DFS_QJOURNAL_WRITE_TXNS_TIMEOUT_KEY, + DFSConfigKeys.DFS_QJOURNAL_WRITE_TXNS_TIMEOUT_DEFAULT); } protected List createLoggers( @@ -303,9 +307,6 @@ private void recoverUnclosedSegment(long segmentTxId) throws IOException { return; } - - // TODO: check that md5s match up between any "tied" logs - SegmentStateProto logToSync = bestResponse.getSegmentState(); assert segmentTxId == logToSync.getStartTxId(); @@ -329,12 +330,11 @@ private void recoverUnclosedSegment(long segmentTxId) throws IOException { QuorumCall accept = loggers.acceptRecovery(logToSync, syncFromUrl); loggers.waitForWriteQuorum(accept, acceptRecoveryTimeoutMs, "acceptRecovery(" + TextFormat.shortDebugString(logToSync) + ")"); - - // TODO: - // we should only try to finalize loggers who successfully synced above - // eg if a logger was down, we don't want to send the finalize request. - // write a test for this! - + + // If one of the loggers above missed the synchronization step above, but + // we send a finalize() here, that's OK. It validates the log before + // finalizing. Hence, even if it is not "in sync", it won't incorrectly + // finalize. QuorumCall finalize = loggers.finalizeLogSegment(logToSync.getStartTxId(), logToSync.getEndTxId()); loggers.waitForWriteQuorum(finalize, finalizeSegmentTimeoutMs, @@ -386,7 +386,8 @@ public EditLogOutputStream startLogSegment(long txId) throws IOException { QuorumCall q = loggers.startLogSegment(txId); loggers.waitForWriteQuorum(q, startSegmentTimeoutMs, "startLogSegment(" + txId + ")"); - return new QuorumOutputStream(loggers, txId); + return new QuorumOutputStream(loggers, txId, + outputBufferCapacity, writeTxnsTimeoutMs); } @Override @@ -400,8 +401,7 @@ public void finalizeLogSegment(long firstTxId, long lastTxId) @Override public void setOutputBufferCapacity(int size) { - // TODO Auto-generated method stub - + outputBufferCapacity = size; } @Override @@ -416,9 +416,14 @@ public void purgeLogsOlderThan(long minTxIdToKeep) throws IOException { public void recoverUnfinalizedSegments() throws IOException { Preconditions.checkState(!isActiveWriter, "already active writer"); + LOG.info("Starting recovery process for unclosed journal segments..."); Map resps = createNewUniqueEpoch(); - LOG.info("newEpoch(" + loggers.getEpoch() + ") responses:\n" + + LOG.info("Successfully started new epoch " + loggers.getEpoch()); + + if (LOG.isDebugEnabled()) { + LOG.debug("newEpoch(" + loggers.getEpoch() + ") responses:\n" + QuorumCall.mapToString(resps)); + } long mostRecentSegmentTxId = Long.MIN_VALUE; for (NewEpochResponseProto r : resps.values()) { @@ -476,7 +481,7 @@ public void selectInputStreams(Collection streams, @Override public String toString() { - return "Quorum journal manager " + uri; + return "QJM to " + loggers; } @VisibleForTesting diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java index 6d02b589bc..7a9549d920 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java @@ -32,13 +32,16 @@ class QuorumOutputStream extends EditLogOutputStream { private final AsyncLoggerSet loggers; private EditsDoubleBuffer buf; private final long segmentTxId; + private final int writeTimeoutMs; public QuorumOutputStream(AsyncLoggerSet loggers, - long txId) throws IOException { + long txId, int outputBufferCapacity, + int writeTimeoutMs) throws IOException { super(); - this.buf = new EditsDoubleBuffer(256*1024); // TODO: conf + this.buf = new EditsDoubleBuffer(outputBufferCapacity); this.loggers = loggers; this.segmentTxId = txId; + this.writeTimeoutMs = writeTimeoutMs; } @Override @@ -101,7 +104,7 @@ protected void flushAndSync(boolean durable) throws IOException { QuorumCall qcall = loggers.sendEdits( segmentTxId, firstTxToFlush, numReadyTxns, data); - loggers.waitForWriteQuorum(qcall, 20000, "sendEdits"); // TODO: configurable timeout + loggers.waitForWriteQuorum(qcall, writeTimeoutMs, "sendEdits"); // Since we successfully wrote this batch, let the loggers know. Any future // RPCs will thus let the loggers know of the most recent transaction, even @@ -118,4 +121,8 @@ public String generateHtmlReport() { return sb.toString(); } + @Override + public String toString() { + return "QuorumOutputStream starting at txid " + segmentTxId; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/GetJournalEditServlet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/GetJournalEditServlet.java index 278dc85961..fc4393d2bd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/GetJournalEditServlet.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/GetJournalEditServlet.java @@ -37,18 +37,15 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; -import org.apache.hadoop.hdfs.HAUtil; import org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager; import org.apache.hadoop.hdfs.server.common.JspHelper; import org.apache.hadoop.hdfs.server.namenode.FileJournalManager; import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile; import org.apache.hadoop.hdfs.server.namenode.GetImageServlet; -import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode; import org.apache.hadoop.hdfs.server.namenode.TransferFsImage; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.util.DataTransferThrottler; -import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.ServletUtil; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java index 5207228aa6..161c67737d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java @@ -28,8 +28,6 @@ import org.apache.hadoop.hdfs.server.namenode.NNStorage; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; -import com.google.common.base.Preconditions; - /** * A {@link Storage} implementation for the {@link JournalNode}. * diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java index d4953bc9a0..b6d90ef037 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java @@ -52,6 +52,7 @@ import org.apache.hadoop.hdfs.util.BestEffortLongFile; import org.apache.hadoop.hdfs.util.PersistentLongFile; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.ipc.Server; import org.apache.hadoop.security.SecurityUtil; import com.google.common.annotations.VisibleForTesting; @@ -87,6 +88,17 @@ class Journal implements Closeable { * number of that writer is stored persistently on disk. */ private PersistentLongFile lastPromisedEpoch; + + /** + * Each IPC that comes from a given client contains a serial number + * which only increases from the client's perspective. Whenever + * we switch epochs, we reset this back to -1. Whenever an IPC + * comes from a client, we ensure that it is strictly higher + * than any previous IPC. This guards against any bugs in the IPC + * layer that would re-order IPCs or cause a stale retry from an old + * request to resurface and confuse things. + */ + private long currentEpochIpcSerial = -1; /** * The epoch number of the last writer to actually write a transaction. @@ -262,13 +274,15 @@ synchronized NewEpochResponseProto newEpoch( checkFormatted(); storage.checkConsistentNamespace(nsInfo); - + + // Check that the new epoch being proposed is in fact newer than + // any other that we've promised. if (epoch <= getLastPromisedEpoch()) { throw new IOException("Proposed epoch " + epoch + " <= last promise " + getLastPromisedEpoch()); } - lastPromisedEpoch.set(epoch); + updateLastPromisedEpoch(epoch); abortCurSegment(); NewEpochResponseProto.Builder builder = @@ -283,6 +297,16 @@ synchronized NewEpochResponseProto newEpoch( return builder.build(); } + private void updateLastPromisedEpoch(long newEpoch) throws IOException { + LOG.info("Updating lastPromisedEpoch from " + lastPromisedEpoch.get() + + " to " + newEpoch + " for client " + Server.getRemoteIp()); + lastPromisedEpoch.set(newEpoch); + + // Since we have a new writer, reset the IPC serial - it will start + // counting again from 0 for this writer. + currentEpochIpcSerial = -1; + } + private void abortCurSegment() throws IOException { if (curSegment == null) { return; @@ -372,14 +396,19 @@ private synchronized void checkRequest(RequestInfo reqInfo) throws IOException { throw new IOException("IPC's epoch " + reqInfo.getEpoch() + " is less than the last promised epoch " + lastPromisedEpoch.get()); + } else if (reqInfo.getEpoch() > lastPromisedEpoch.get()) { + // A newer client has arrived. Fence any previous writers by updating + // the promise. + updateLastPromisedEpoch(reqInfo.getEpoch()); } - // TODO: should other requests check the _exact_ epoch instead of - // the <= check? <= should probably only be necessary for the - // first calls - - // TODO: some check on serial number that they only increase from a given - // client + // Ensure that the IPCs are arriving in-order as expected. + checkSync(reqInfo.getIpcSerialNumber() > currentEpochIpcSerial, + "IPC serial %s from client %s was not higher than prior highest " + + "IPC serial %s", reqInfo.getIpcSerialNumber(), + Server.getRemoteIp(), + currentEpochIpcSerial); + currentEpochIpcSerial = reqInfo.getIpcSerialNumber(); if (reqInfo.hasCommittedTxId()) { Preconditions.checkArgument( @@ -424,6 +453,22 @@ private void checkSync(boolean expression, String msg, } } + /** + * @throws AssertionError if the given expression is not true. + * The message of the exception is formatted using the 'msg' and + * 'formatArgs' parameters. + * + * This should be used in preference to Java's built-in assert in + * non-performance-critical paths, where a failure of this invariant + * might cause the protocol to lose data. + */ + private void alwaysAssert(boolean expression, String msg, + Object... formatArgs) { + if (!expression) { + throw new AssertionError(String.format(msg, formatArgs)); + } + } + /** * Start a new segment at the given txid. The previous segment * must have already been finalized. @@ -466,7 +511,9 @@ public synchronized void startLogSegment(RequestInfo reqInfo, long txid) long curLastWriterEpoch = lastWriterEpoch.get(); if (curLastWriterEpoch != reqInfo.getEpoch()) { - LOG.info("Recording lastWriterEpoch = " + reqInfo.getEpoch()); + LOG.info("Updating lastWriterEpoch from " + curLastWriterEpoch + + " to " + reqInfo.getEpoch() + " for client " + + Server.getRemoteIp()); lastWriterEpoch.set(reqInfo.getEpoch()); } @@ -689,9 +736,8 @@ public synchronized void acceptRecovery(RequestInfo reqInfo, long segmentTxId = segment.getStartTxId(); - // TODO: right now, a recovery of a segment when the log is - // completely emtpy (ie startLogSegment() but no txns) - // will fail this assertion here, since endTxId < startTxId + // Basic sanity checks that the segment is well-formed and contains + // at least one transaction. Preconditions.checkArgument(segment.getEndTxId() > 0 && segment.getEndTxId() >= segmentTxId, "bad recovery state for segment %s: %s", @@ -702,8 +748,12 @@ public synchronized void acceptRecovery(RequestInfo reqInfo, .setAcceptedInEpoch(reqInfo.getEpoch()) .setSegmentState(segment) .build(); + + // If we previously acted on acceptRecovery() from a higher-numbered writer, + // this call is out of sync. We should never actually trigger this, since the + // checkRequest() call above should filter non-increasing epoch numbers. if (oldData != null) { - Preconditions.checkState(oldData.getAcceptedInEpoch() <= reqInfo.getEpoch(), + alwaysAssert(oldData.getAcceptedInEpoch() <= reqInfo.getEpoch(), "Bad paxos transition, out-of-order epochs.\nOld: %s\nNew: %s\n", oldData, newData); } @@ -737,6 +787,12 @@ public synchronized void acceptRecovery(RequestInfo reqInfo, committedTxnId.get()); } + // Another paranoid check: we should not be asked to synchronize a log + // on top of a finalized segment. + alwaysAssert(currentSegment.getIsInProgress(), + "Should never be asked to synchronize a different log on top of an " + + "already-finalized segment"); + // If we're shortening the log, update our highest txid // used for lag metrics. if (txnRange(currentSegment).contains(highestWrittenTxId)) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalMetrics.java index f6d87c7e92..7bbee5b5ac 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalMetrics.java @@ -24,7 +24,6 @@ import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.lib.MetricsRegistry; import org.apache.hadoop.metrics2.lib.MutableCounterLong; -import org.apache.hadoop.metrics2.lib.MutableGaugeLong; import org.apache.hadoop.metrics2.lib.MutableQuantiles; /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java index 02d9df0c23..05a49566c0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java @@ -70,11 +70,14 @@ class JournalNodeRpcServer implements QJournalProtocol { BlockingService service = QJournalProtocolService .newReflectiveBlockingService(translator); - this.server = RPC.getServer( - QJournalProtocolPB.class, - service, addr.getHostName(), - addr.getPort(), HANDLER_COUNT, false, confCopy, - null /*secretManager*/); + this.server = new RPC.Builder(confCopy) + .setProtocol(QJournalProtocolPB.class) + .setInstance(service) + .setBindAddress(addr.getHostName()) + .setPort(addr.getPort()) + .setNumHandlers(HANDLER_COUNT) + .setVerbose(false) + .build(); // set service-level authorization security policy if (confCopy.getBoolean( diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/PersistentLongFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/PersistentLongFile.java index 5e776226fa..292d0dfe63 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/PersistentLongFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/PersistentLongFile.java @@ -57,7 +57,9 @@ public long get() throws IOException { } public void set(long newVal) throws IOException { - writeFile(file, newVal); + if (value != newVal || !loaded) { + writeFile(file, newVal); + } value = newVal; loaded = true; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestNNWithQJM.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestNNWithQJM.java index 2284faf8ff..a5463d03b3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestNNWithQJM.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestNNWithQJM.java @@ -211,7 +211,7 @@ public void testWebPageHasQjmInfo() throws Exception { cluster.getFileSystem().mkdirs(TEST_PATH); String contents = DFSTestUtil.urlGet(url); - assertTrue(contents.contains("Channel to journal node")); + assertTrue(contents.contains("QJM to [")); assertTrue(contents.contains("Written txid 2")); // Stop one JN, do another txn, and make sure it shows as behind diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestEpochsAreUnique.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestEpochsAreUnique.java index 96fd6fe47b..41138d0eef 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestEpochsAreUnique.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestEpochsAreUnique.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.net.URI; -import java.util.List; import java.util.Random; import org.apache.commons.logging.Log; @@ -30,7 +29,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster; import org.apache.hadoop.hdfs.qjournal.client.AsyncLogger; -import org.apache.hadoop.hdfs.qjournal.client.AsyncLoggerSet; import org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.junit.Test; @@ -38,7 +36,6 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; -import com.google.common.collect.Lists; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQJMWithFaults.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQJMWithFaults.java index 44195a1af5..eeac5438e6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQJMWithFaults.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQJMWithFaults.java @@ -74,6 +74,8 @@ public class TestQJMWithFaults { private static final int SEGMENTS_PER_WRITER = 2; private static Configuration conf = new Configuration(); + + static { // Don't retry connections - it just slows down the tests. conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java index 461c4a7472..1352dcf552 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java @@ -799,6 +799,13 @@ public void testPurgeLogs() throws Exception { "3"); } + @Test + public void testToString() throws Exception { + GenericTestUtils.assertMatches( + qjm.toString(), + "QJM to \\[127.0.0.1:\\d+, 127.0.0.1:\\d+, 127.0.0.1:\\d+\\]"); + } + private QuorumJournalManager createSpyingQJM() throws IOException, URISyntaxException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java index aaf1f883cd..c9db35faca 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java @@ -177,7 +177,7 @@ public void testNewEpochAtBeginningOfSegment() throws Exception { journal.journal(makeRI(2), 1, 1, 2, QJMTestUtil.createTxnData(1, 2)); journal.finalizeLogSegment(makeRI(3), 1, 2); - journal.startLogSegment(makeRI(3), 3); + journal.startLogSegment(makeRI(4), 3); NewEpochResponseProto resp = journal.newEpoch(FAKE_NSINFO, 2); assertEquals(1, resp.getLastSegmentTxId()); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java index 11c4afc1a9..ac9fbd950b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java @@ -240,7 +240,6 @@ public void testAcceptRecoveryBehavior() throws Exception { assertTrue(prep.hasSegmentState()); // accept() should save the accepted value in persistent storage - // TODO: should be able to accept without a URL here ch.acceptRecovery(prep.getSegmentState(), new URL("file:///dev/null")).get(); // So another prepare() call from a new epoch would return this value @@ -326,12 +325,7 @@ private void doPerfTest(int editsSize, int numEdits) throws Exception { " bytes in " + time + "ms"); float avgRtt = (float)time/(float)numEdits; long throughput = ((long)numEdits * editsSize * 1000L)/time; - System.err.println("Time per batch: " + avgRtt); + System.err.println("Time per batch: " + avgRtt + "ms"); System.err.println("Throughput: " + throughput + " bytes/sec"); } - - // TODO: - // - add test that checks formatting behavior - // - add test that checks rejects newEpoch if nsinfo doesn't match - }