HDFS-3950. QJM: misc TODO cleanup, improved log messages, etc. Contributed by Todd Lipcon.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-3077@1387704 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
e9f4de5ced
commit
663e7484c0
@ -76,3 +76,5 @@ HDFS-3894. QJM: testRecoverAfterDoubleFailures can be flaky due to IPC client ca
|
|||||||
HDFS-3926. QJM: Add user documentation for QJM. (atm)
|
HDFS-3926. QJM: Add user documentation for QJM. (atm)
|
||||||
|
|
||||||
HDFS-3943. QJM: remove currently-unused md5sum field (todd)
|
HDFS-3943. QJM: remove currently-unused md5sum field (todd)
|
||||||
|
|
||||||
|
HDFS-3950. QJM: misc TODO cleanup, improved log messages, etc. (todd)
|
||||||
|
@ -422,6 +422,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
|||||||
public static final String DFS_QJOURNAL_SELECT_INPUT_STREAMS_TIMEOUT_KEY = "dfs.qjournal.select-input-streams.timeout.ms";
|
public static final String DFS_QJOURNAL_SELECT_INPUT_STREAMS_TIMEOUT_KEY = "dfs.qjournal.select-input-streams.timeout.ms";
|
||||||
public static final String DFS_QJOURNAL_GET_JOURNAL_STATE_TIMEOUT_KEY = "dfs.qjournal.get-journal-state.timeout.ms";
|
public static final String DFS_QJOURNAL_GET_JOURNAL_STATE_TIMEOUT_KEY = "dfs.qjournal.get-journal-state.timeout.ms";
|
||||||
public static final String DFS_QJOURNAL_NEW_EPOCH_TIMEOUT_KEY = "dfs.qjournal.new-epoch.timeout.ms";
|
public static final String DFS_QJOURNAL_NEW_EPOCH_TIMEOUT_KEY = "dfs.qjournal.new-epoch.timeout.ms";
|
||||||
|
public static final String DFS_QJOURNAL_WRITE_TXNS_TIMEOUT_KEY = "dfs.qjournal.write-txns.timeout.ms";
|
||||||
public static final int DFS_QJOURNAL_START_SEGMENT_TIMEOUT_DEFAULT = 20000;
|
public static final int DFS_QJOURNAL_START_SEGMENT_TIMEOUT_DEFAULT = 20000;
|
||||||
public static final int DFS_QJOURNAL_PREPARE_RECOVERY_TIMEOUT_DEFAULT = 120000;
|
public static final int DFS_QJOURNAL_PREPARE_RECOVERY_TIMEOUT_DEFAULT = 120000;
|
||||||
public static final int DFS_QJOURNAL_ACCEPT_RECOVERY_TIMEOUT_DEFAULT = 120000;
|
public static final int DFS_QJOURNAL_ACCEPT_RECOVERY_TIMEOUT_DEFAULT = 120000;
|
||||||
@ -429,5 +430,6 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
|||||||
public static final int DFS_QJOURNAL_SELECT_INPUT_STREAMS_TIMEOUT_DEFAULT = 20000;
|
public static final int DFS_QJOURNAL_SELECT_INPUT_STREAMS_TIMEOUT_DEFAULT = 20000;
|
||||||
public static final int DFS_QJOURNAL_GET_JOURNAL_STATE_TIMEOUT_DEFAULT = 120000;
|
public static final int DFS_QJOURNAL_GET_JOURNAL_STATE_TIMEOUT_DEFAULT = 120000;
|
||||||
public static final int DFS_QJOURNAL_NEW_EPOCH_TIMEOUT_DEFAULT = 120000;
|
public static final int DFS_QJOURNAL_NEW_EPOCH_TIMEOUT_DEFAULT = 120000;
|
||||||
|
public static final int DFS_QJOURNAL_WRITE_TXNS_TIMEOUT_DEFAULT = 20000;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -25,24 +25,20 @@
|
|||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.hdfs.qjournal.protocol.JournalNotFormattedException;
|
|
||||||
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
|
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
|
||||||
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
|
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
|
||||||
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
|
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
|
||||||
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
|
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
|
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
|
||||||
import org.apache.jasper.compiler.JspUtil;
|
import org.apache.jasper.compiler.JspUtil;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import com.google.common.base.Joiner;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
import com.google.common.collect.Maps;
|
import com.google.common.collect.Maps;
|
||||||
import com.google.common.util.concurrent.FutureCallback;
|
|
||||||
import com.google.common.util.concurrent.Futures;
|
|
||||||
import com.google.common.util.concurrent.ListenableFuture;
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
import com.google.common.util.concurrent.SettableFuture;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Wrapper around a set of Loggers, taking care of fanning out
|
* Wrapper around a set of Loggers, taking care of fanning out
|
||||||
@ -171,6 +167,11 @@ int size() {
|
|||||||
return loggers.size();
|
return loggers.size();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "[" + Joiner.on(", ").join(loggers) + "]";
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Append an HTML-formatted status readout on the current
|
* Append an HTML-formatted status readout on the current
|
||||||
* state of the underlying loggers.
|
* state of the underlying loggers.
|
||||||
|
@ -54,6 +54,7 @@
|
|||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import com.google.common.base.Stopwatch;
|
import com.google.common.base.Stopwatch;
|
||||||
|
import com.google.common.net.InetAddresses;
|
||||||
import com.google.common.util.concurrent.FutureCallback;
|
import com.google.common.util.concurrent.FutureCallback;
|
||||||
import com.google.common.util.concurrent.Futures;
|
import com.google.common.util.concurrent.Futures;
|
||||||
import com.google.common.util.concurrent.ListenableFuture;
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
@ -554,7 +555,8 @@ public Void call() throws IOException {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "Channel to journal node " + addr;
|
return InetAddresses.toAddrString(addr.getAddress()) + ':' +
|
||||||
|
addr.getPort();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -71,6 +71,7 @@ public class QuorumJournalManager implements JournalManager {
|
|||||||
private final int selectInputStreamsTimeoutMs;
|
private final int selectInputStreamsTimeoutMs;
|
||||||
private final int getJournalStateTimeoutMs;
|
private final int getJournalStateTimeoutMs;
|
||||||
private final int newEpochTimeoutMs;
|
private final int newEpochTimeoutMs;
|
||||||
|
private final int writeTxnsTimeoutMs;
|
||||||
|
|
||||||
// Since these don't occur during normal operation, we can
|
// Since these don't occur during normal operation, we can
|
||||||
// use rather lengthy timeouts, and don't need to make them
|
// use rather lengthy timeouts, and don't need to make them
|
||||||
@ -84,6 +85,8 @@ public class QuorumJournalManager implements JournalManager {
|
|||||||
private boolean isActiveWriter;
|
private boolean isActiveWriter;
|
||||||
|
|
||||||
private final AsyncLoggerSet loggers;
|
private final AsyncLoggerSet loggers;
|
||||||
|
|
||||||
|
private int outputBufferCapacity = 512 * 1024;
|
||||||
|
|
||||||
public QuorumJournalManager(Configuration conf,
|
public QuorumJournalManager(Configuration conf,
|
||||||
URI uri, NamespaceInfo nsInfo) throws IOException {
|
URI uri, NamespaceInfo nsInfo) throws IOException {
|
||||||
@ -122,8 +125,9 @@ public QuorumJournalManager(Configuration conf,
|
|||||||
this.newEpochTimeoutMs = conf.getInt(
|
this.newEpochTimeoutMs = conf.getInt(
|
||||||
DFSConfigKeys.DFS_QJOURNAL_NEW_EPOCH_TIMEOUT_KEY,
|
DFSConfigKeys.DFS_QJOURNAL_NEW_EPOCH_TIMEOUT_KEY,
|
||||||
DFSConfigKeys.DFS_QJOURNAL_NEW_EPOCH_TIMEOUT_DEFAULT);
|
DFSConfigKeys.DFS_QJOURNAL_NEW_EPOCH_TIMEOUT_DEFAULT);
|
||||||
|
this.writeTxnsTimeoutMs = conf.getInt(
|
||||||
|
DFSConfigKeys.DFS_QJOURNAL_WRITE_TXNS_TIMEOUT_KEY,
|
||||||
|
DFSConfigKeys.DFS_QJOURNAL_WRITE_TXNS_TIMEOUT_DEFAULT);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected List<AsyncLogger> createLoggers(
|
protected List<AsyncLogger> createLoggers(
|
||||||
@ -303,9 +307,6 @@ private void recoverUnclosedSegment(long segmentTxId) throws IOException {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// TODO: check that md5s match up between any "tied" logs
|
|
||||||
|
|
||||||
SegmentStateProto logToSync = bestResponse.getSegmentState();
|
SegmentStateProto logToSync = bestResponse.getSegmentState();
|
||||||
assert segmentTxId == logToSync.getStartTxId();
|
assert segmentTxId == logToSync.getStartTxId();
|
||||||
|
|
||||||
@ -329,12 +330,11 @@ private void recoverUnclosedSegment(long segmentTxId) throws IOException {
|
|||||||
QuorumCall<AsyncLogger,Void> accept = loggers.acceptRecovery(logToSync, syncFromUrl);
|
QuorumCall<AsyncLogger,Void> accept = loggers.acceptRecovery(logToSync, syncFromUrl);
|
||||||
loggers.waitForWriteQuorum(accept, acceptRecoveryTimeoutMs,
|
loggers.waitForWriteQuorum(accept, acceptRecoveryTimeoutMs,
|
||||||
"acceptRecovery(" + TextFormat.shortDebugString(logToSync) + ")");
|
"acceptRecovery(" + TextFormat.shortDebugString(logToSync) + ")");
|
||||||
|
|
||||||
// TODO:
|
// If one of the loggers above missed the synchronization step above, but
|
||||||
// we should only try to finalize loggers who successfully synced above
|
// we send a finalize() here, that's OK. It validates the log before
|
||||||
// eg if a logger was down, we don't want to send the finalize request.
|
// finalizing. Hence, even if it is not "in sync", it won't incorrectly
|
||||||
// write a test for this!
|
// finalize.
|
||||||
|
|
||||||
QuorumCall<AsyncLogger, Void> finalize =
|
QuorumCall<AsyncLogger, Void> finalize =
|
||||||
loggers.finalizeLogSegment(logToSync.getStartTxId(), logToSync.getEndTxId());
|
loggers.finalizeLogSegment(logToSync.getStartTxId(), logToSync.getEndTxId());
|
||||||
loggers.waitForWriteQuorum(finalize, finalizeSegmentTimeoutMs,
|
loggers.waitForWriteQuorum(finalize, finalizeSegmentTimeoutMs,
|
||||||
@ -386,7 +386,8 @@ public EditLogOutputStream startLogSegment(long txId) throws IOException {
|
|||||||
QuorumCall<AsyncLogger,Void> q = loggers.startLogSegment(txId);
|
QuorumCall<AsyncLogger,Void> q = loggers.startLogSegment(txId);
|
||||||
loggers.waitForWriteQuorum(q, startSegmentTimeoutMs,
|
loggers.waitForWriteQuorum(q, startSegmentTimeoutMs,
|
||||||
"startLogSegment(" + txId + ")");
|
"startLogSegment(" + txId + ")");
|
||||||
return new QuorumOutputStream(loggers, txId);
|
return new QuorumOutputStream(loggers, txId,
|
||||||
|
outputBufferCapacity, writeTxnsTimeoutMs);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -400,8 +401,7 @@ public void finalizeLogSegment(long firstTxId, long lastTxId)
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setOutputBufferCapacity(int size) {
|
public void setOutputBufferCapacity(int size) {
|
||||||
// TODO Auto-generated method stub
|
outputBufferCapacity = size;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -416,9 +416,14 @@ public void purgeLogsOlderThan(long minTxIdToKeep) throws IOException {
|
|||||||
public void recoverUnfinalizedSegments() throws IOException {
|
public void recoverUnfinalizedSegments() throws IOException {
|
||||||
Preconditions.checkState(!isActiveWriter, "already active writer");
|
Preconditions.checkState(!isActiveWriter, "already active writer");
|
||||||
|
|
||||||
|
LOG.info("Starting recovery process for unclosed journal segments...");
|
||||||
Map<AsyncLogger, NewEpochResponseProto> resps = createNewUniqueEpoch();
|
Map<AsyncLogger, NewEpochResponseProto> resps = createNewUniqueEpoch();
|
||||||
LOG.info("newEpoch(" + loggers.getEpoch() + ") responses:\n" +
|
LOG.info("Successfully started new epoch " + loggers.getEpoch());
|
||||||
|
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("newEpoch(" + loggers.getEpoch() + ") responses:\n" +
|
||||||
QuorumCall.mapToString(resps));
|
QuorumCall.mapToString(resps));
|
||||||
|
}
|
||||||
|
|
||||||
long mostRecentSegmentTxId = Long.MIN_VALUE;
|
long mostRecentSegmentTxId = Long.MIN_VALUE;
|
||||||
for (NewEpochResponseProto r : resps.values()) {
|
for (NewEpochResponseProto r : resps.values()) {
|
||||||
@ -476,7 +481,7 @@ public void selectInputStreams(Collection<EditLogInputStream> streams,
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "Quorum journal manager " + uri;
|
return "QJM to " + loggers;
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
|
@ -32,13 +32,16 @@ class QuorumOutputStream extends EditLogOutputStream {
|
|||||||
private final AsyncLoggerSet loggers;
|
private final AsyncLoggerSet loggers;
|
||||||
private EditsDoubleBuffer buf;
|
private EditsDoubleBuffer buf;
|
||||||
private final long segmentTxId;
|
private final long segmentTxId;
|
||||||
|
private final int writeTimeoutMs;
|
||||||
|
|
||||||
public QuorumOutputStream(AsyncLoggerSet loggers,
|
public QuorumOutputStream(AsyncLoggerSet loggers,
|
||||||
long txId) throws IOException {
|
long txId, int outputBufferCapacity,
|
||||||
|
int writeTimeoutMs) throws IOException {
|
||||||
super();
|
super();
|
||||||
this.buf = new EditsDoubleBuffer(256*1024); // TODO: conf
|
this.buf = new EditsDoubleBuffer(outputBufferCapacity);
|
||||||
this.loggers = loggers;
|
this.loggers = loggers;
|
||||||
this.segmentTxId = txId;
|
this.segmentTxId = txId;
|
||||||
|
this.writeTimeoutMs = writeTimeoutMs;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -101,7 +104,7 @@ protected void flushAndSync(boolean durable) throws IOException {
|
|||||||
QuorumCall<AsyncLogger, Void> qcall = loggers.sendEdits(
|
QuorumCall<AsyncLogger, Void> qcall = loggers.sendEdits(
|
||||||
segmentTxId, firstTxToFlush,
|
segmentTxId, firstTxToFlush,
|
||||||
numReadyTxns, data);
|
numReadyTxns, data);
|
||||||
loggers.waitForWriteQuorum(qcall, 20000, "sendEdits"); // TODO: configurable timeout
|
loggers.waitForWriteQuorum(qcall, writeTimeoutMs, "sendEdits");
|
||||||
|
|
||||||
// Since we successfully wrote this batch, let the loggers know. Any future
|
// Since we successfully wrote this batch, let the loggers know. Any future
|
||||||
// RPCs will thus let the loggers know of the most recent transaction, even
|
// RPCs will thus let the loggers know of the most recent transaction, even
|
||||||
@ -118,4 +121,8 @@ public String generateHtmlReport() {
|
|||||||
return sb.toString();
|
return sb.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "QuorumOutputStream starting at txid " + segmentTxId;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -37,18 +37,15 @@
|
|||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.DFSUtil;
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
import org.apache.hadoop.hdfs.HAUtil;
|
|
||||||
import org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager;
|
import org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager;
|
||||||
import org.apache.hadoop.hdfs.server.common.JspHelper;
|
import org.apache.hadoop.hdfs.server.common.JspHelper;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FileJournalManager;
|
import org.apache.hadoop.hdfs.server.namenode.FileJournalManager;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
|
import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.GetImageServlet;
|
import org.apache.hadoop.hdfs.server.namenode.GetImageServlet;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
|
||||||
import org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode;
|
import org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.TransferFsImage;
|
import org.apache.hadoop.hdfs.server.namenode.TransferFsImage;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||||
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
|
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
|
||||||
import org.apache.hadoop.security.SecurityUtil;
|
import org.apache.hadoop.security.SecurityUtil;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.util.ServletUtil;
|
import org.apache.hadoop.util.ServletUtil;
|
||||||
|
@ -28,8 +28,6 @@
|
|||||||
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
|
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||||
|
|
||||||
import com.google.common.base.Preconditions;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A {@link Storage} implementation for the {@link JournalNode}.
|
* A {@link Storage} implementation for the {@link JournalNode}.
|
||||||
*
|
*
|
||||||
|
@ -52,6 +52,7 @@
|
|||||||
import org.apache.hadoop.hdfs.util.BestEffortLongFile;
|
import org.apache.hadoop.hdfs.util.BestEffortLongFile;
|
||||||
import org.apache.hadoop.hdfs.util.PersistentLongFile;
|
import org.apache.hadoop.hdfs.util.PersistentLongFile;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
|
import org.apache.hadoop.ipc.Server;
|
||||||
import org.apache.hadoop.security.SecurityUtil;
|
import org.apache.hadoop.security.SecurityUtil;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
@ -87,6 +88,17 @@ class Journal implements Closeable {
|
|||||||
* number of that writer is stored persistently on disk.
|
* number of that writer is stored persistently on disk.
|
||||||
*/
|
*/
|
||||||
private PersistentLongFile lastPromisedEpoch;
|
private PersistentLongFile lastPromisedEpoch;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Each IPC that comes from a given client contains a serial number
|
||||||
|
* which only increases from the client's perspective. Whenever
|
||||||
|
* we switch epochs, we reset this back to -1. Whenever an IPC
|
||||||
|
* comes from a client, we ensure that it is strictly higher
|
||||||
|
* than any previous IPC. This guards against any bugs in the IPC
|
||||||
|
* layer that would re-order IPCs or cause a stale retry from an old
|
||||||
|
* request to resurface and confuse things.
|
||||||
|
*/
|
||||||
|
private long currentEpochIpcSerial = -1;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The epoch number of the last writer to actually write a transaction.
|
* The epoch number of the last writer to actually write a transaction.
|
||||||
@ -262,13 +274,15 @@ synchronized NewEpochResponseProto newEpoch(
|
|||||||
|
|
||||||
checkFormatted();
|
checkFormatted();
|
||||||
storage.checkConsistentNamespace(nsInfo);
|
storage.checkConsistentNamespace(nsInfo);
|
||||||
|
|
||||||
|
// Check that the new epoch being proposed is in fact newer than
|
||||||
|
// any other that we've promised.
|
||||||
if (epoch <= getLastPromisedEpoch()) {
|
if (epoch <= getLastPromisedEpoch()) {
|
||||||
throw new IOException("Proposed epoch " + epoch + " <= last promise " +
|
throw new IOException("Proposed epoch " + epoch + " <= last promise " +
|
||||||
getLastPromisedEpoch());
|
getLastPromisedEpoch());
|
||||||
}
|
}
|
||||||
|
|
||||||
lastPromisedEpoch.set(epoch);
|
updateLastPromisedEpoch(epoch);
|
||||||
abortCurSegment();
|
abortCurSegment();
|
||||||
|
|
||||||
NewEpochResponseProto.Builder builder =
|
NewEpochResponseProto.Builder builder =
|
||||||
@ -283,6 +297,16 @@ synchronized NewEpochResponseProto newEpoch(
|
|||||||
return builder.build();
|
return builder.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void updateLastPromisedEpoch(long newEpoch) throws IOException {
|
||||||
|
LOG.info("Updating lastPromisedEpoch from " + lastPromisedEpoch.get() +
|
||||||
|
" to " + newEpoch + " for client " + Server.getRemoteIp());
|
||||||
|
lastPromisedEpoch.set(newEpoch);
|
||||||
|
|
||||||
|
// Since we have a new writer, reset the IPC serial - it will start
|
||||||
|
// counting again from 0 for this writer.
|
||||||
|
currentEpochIpcSerial = -1;
|
||||||
|
}
|
||||||
|
|
||||||
private void abortCurSegment() throws IOException {
|
private void abortCurSegment() throws IOException {
|
||||||
if (curSegment == null) {
|
if (curSegment == null) {
|
||||||
return;
|
return;
|
||||||
@ -372,14 +396,19 @@ private synchronized void checkRequest(RequestInfo reqInfo) throws IOException {
|
|||||||
throw new IOException("IPC's epoch " + reqInfo.getEpoch() +
|
throw new IOException("IPC's epoch " + reqInfo.getEpoch() +
|
||||||
" is less than the last promised epoch " +
|
" is less than the last promised epoch " +
|
||||||
lastPromisedEpoch.get());
|
lastPromisedEpoch.get());
|
||||||
|
} else if (reqInfo.getEpoch() > lastPromisedEpoch.get()) {
|
||||||
|
// A newer client has arrived. Fence any previous writers by updating
|
||||||
|
// the promise.
|
||||||
|
updateLastPromisedEpoch(reqInfo.getEpoch());
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: should other requests check the _exact_ epoch instead of
|
// Ensure that the IPCs are arriving in-order as expected.
|
||||||
// the <= check? <= should probably only be necessary for the
|
checkSync(reqInfo.getIpcSerialNumber() > currentEpochIpcSerial,
|
||||||
// first calls
|
"IPC serial %s from client %s was not higher than prior highest " +
|
||||||
|
"IPC serial %s", reqInfo.getIpcSerialNumber(),
|
||||||
// TODO: some check on serial number that they only increase from a given
|
Server.getRemoteIp(),
|
||||||
// client
|
currentEpochIpcSerial);
|
||||||
|
currentEpochIpcSerial = reqInfo.getIpcSerialNumber();
|
||||||
|
|
||||||
if (reqInfo.hasCommittedTxId()) {
|
if (reqInfo.hasCommittedTxId()) {
|
||||||
Preconditions.checkArgument(
|
Preconditions.checkArgument(
|
||||||
@ -424,6 +453,22 @@ private void checkSync(boolean expression, String msg,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @throws AssertionError if the given expression is not true.
|
||||||
|
* The message of the exception is formatted using the 'msg' and
|
||||||
|
* 'formatArgs' parameters.
|
||||||
|
*
|
||||||
|
* This should be used in preference to Java's built-in assert in
|
||||||
|
* non-performance-critical paths, where a failure of this invariant
|
||||||
|
* might cause the protocol to lose data.
|
||||||
|
*/
|
||||||
|
private void alwaysAssert(boolean expression, String msg,
|
||||||
|
Object... formatArgs) {
|
||||||
|
if (!expression) {
|
||||||
|
throw new AssertionError(String.format(msg, formatArgs));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Start a new segment at the given txid. The previous segment
|
* Start a new segment at the given txid. The previous segment
|
||||||
* must have already been finalized.
|
* must have already been finalized.
|
||||||
@ -466,7 +511,9 @@ public synchronized void startLogSegment(RequestInfo reqInfo, long txid)
|
|||||||
|
|
||||||
long curLastWriterEpoch = lastWriterEpoch.get();
|
long curLastWriterEpoch = lastWriterEpoch.get();
|
||||||
if (curLastWriterEpoch != reqInfo.getEpoch()) {
|
if (curLastWriterEpoch != reqInfo.getEpoch()) {
|
||||||
LOG.info("Recording lastWriterEpoch = " + reqInfo.getEpoch());
|
LOG.info("Updating lastWriterEpoch from " + curLastWriterEpoch +
|
||||||
|
" to " + reqInfo.getEpoch() + " for client " +
|
||||||
|
Server.getRemoteIp());
|
||||||
lastWriterEpoch.set(reqInfo.getEpoch());
|
lastWriterEpoch.set(reqInfo.getEpoch());
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -689,9 +736,8 @@ public synchronized void acceptRecovery(RequestInfo reqInfo,
|
|||||||
|
|
||||||
long segmentTxId = segment.getStartTxId();
|
long segmentTxId = segment.getStartTxId();
|
||||||
|
|
||||||
// TODO: right now, a recovery of a segment when the log is
|
// Basic sanity checks that the segment is well-formed and contains
|
||||||
// completely emtpy (ie startLogSegment() but no txns)
|
// at least one transaction.
|
||||||
// will fail this assertion here, since endTxId < startTxId
|
|
||||||
Preconditions.checkArgument(segment.getEndTxId() > 0 &&
|
Preconditions.checkArgument(segment.getEndTxId() > 0 &&
|
||||||
segment.getEndTxId() >= segmentTxId,
|
segment.getEndTxId() >= segmentTxId,
|
||||||
"bad recovery state for segment %s: %s",
|
"bad recovery state for segment %s: %s",
|
||||||
@ -702,8 +748,12 @@ public synchronized void acceptRecovery(RequestInfo reqInfo,
|
|||||||
.setAcceptedInEpoch(reqInfo.getEpoch())
|
.setAcceptedInEpoch(reqInfo.getEpoch())
|
||||||
.setSegmentState(segment)
|
.setSegmentState(segment)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
|
// If we previously acted on acceptRecovery() from a higher-numbered writer,
|
||||||
|
// this call is out of sync. We should never actually trigger this, since the
|
||||||
|
// checkRequest() call above should filter non-increasing epoch numbers.
|
||||||
if (oldData != null) {
|
if (oldData != null) {
|
||||||
Preconditions.checkState(oldData.getAcceptedInEpoch() <= reqInfo.getEpoch(),
|
alwaysAssert(oldData.getAcceptedInEpoch() <= reqInfo.getEpoch(),
|
||||||
"Bad paxos transition, out-of-order epochs.\nOld: %s\nNew: %s\n",
|
"Bad paxos transition, out-of-order epochs.\nOld: %s\nNew: %s\n",
|
||||||
oldData, newData);
|
oldData, newData);
|
||||||
}
|
}
|
||||||
@ -737,6 +787,12 @@ public synchronized void acceptRecovery(RequestInfo reqInfo,
|
|||||||
committedTxnId.get());
|
committedTxnId.get());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Another paranoid check: we should not be asked to synchronize a log
|
||||||
|
// on top of a finalized segment.
|
||||||
|
alwaysAssert(currentSegment.getIsInProgress(),
|
||||||
|
"Should never be asked to synchronize a different log on top of an " +
|
||||||
|
"already-finalized segment");
|
||||||
|
|
||||||
// If we're shortening the log, update our highest txid
|
// If we're shortening the log, update our highest txid
|
||||||
// used for lag metrics.
|
// used for lag metrics.
|
||||||
if (txnRange(currentSegment).contains(highestWrittenTxId)) {
|
if (txnRange(currentSegment).contains(highestWrittenTxId)) {
|
||||||
|
@ -24,7 +24,6 @@
|
|||||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||||
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
|
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
|
||||||
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
|
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
|
||||||
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
|
|
||||||
import org.apache.hadoop.metrics2.lib.MutableQuantiles;
|
import org.apache.hadoop.metrics2.lib.MutableQuantiles;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -70,11 +70,14 @@ class JournalNodeRpcServer implements QJournalProtocol {
|
|||||||
BlockingService service = QJournalProtocolService
|
BlockingService service = QJournalProtocolService
|
||||||
.newReflectiveBlockingService(translator);
|
.newReflectiveBlockingService(translator);
|
||||||
|
|
||||||
this.server = RPC.getServer(
|
this.server = new RPC.Builder(confCopy)
|
||||||
QJournalProtocolPB.class,
|
.setProtocol(QJournalProtocolPB.class)
|
||||||
service, addr.getHostName(),
|
.setInstance(service)
|
||||||
addr.getPort(), HANDLER_COUNT, false, confCopy,
|
.setBindAddress(addr.getHostName())
|
||||||
null /*secretManager*/);
|
.setPort(addr.getPort())
|
||||||
|
.setNumHandlers(HANDLER_COUNT)
|
||||||
|
.setVerbose(false)
|
||||||
|
.build();
|
||||||
|
|
||||||
// set service-level authorization security policy
|
// set service-level authorization security policy
|
||||||
if (confCopy.getBoolean(
|
if (confCopy.getBoolean(
|
||||||
|
@ -57,7 +57,9 @@ public long get() throws IOException {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public void set(long newVal) throws IOException {
|
public void set(long newVal) throws IOException {
|
||||||
writeFile(file, newVal);
|
if (value != newVal || !loaded) {
|
||||||
|
writeFile(file, newVal);
|
||||||
|
}
|
||||||
value = newVal;
|
value = newVal;
|
||||||
loaded = true;
|
loaded = true;
|
||||||
}
|
}
|
||||||
|
@ -211,7 +211,7 @@ public void testWebPageHasQjmInfo() throws Exception {
|
|||||||
cluster.getFileSystem().mkdirs(TEST_PATH);
|
cluster.getFileSystem().mkdirs(TEST_PATH);
|
||||||
|
|
||||||
String contents = DFSTestUtil.urlGet(url);
|
String contents = DFSTestUtil.urlGet(url);
|
||||||
assertTrue(contents.contains("Channel to journal node"));
|
assertTrue(contents.contains("QJM to ["));
|
||||||
assertTrue(contents.contains("Written txid 2"));
|
assertTrue(contents.contains("Written txid 2"));
|
||||||
|
|
||||||
// Stop one JN, do another txn, and make sure it shows as behind
|
// Stop one JN, do another txn, and make sure it shows as behind
|
||||||
|
@ -22,7 +22,6 @@
|
|||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.util.List;
|
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
@ -30,7 +29,6 @@
|
|||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster;
|
import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster;
|
||||||
import org.apache.hadoop.hdfs.qjournal.client.AsyncLogger;
|
import org.apache.hadoop.hdfs.qjournal.client.AsyncLogger;
|
||||||
import org.apache.hadoop.hdfs.qjournal.client.AsyncLoggerSet;
|
|
||||||
import org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager;
|
import org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
@ -38,7 +36,6 @@
|
|||||||
import org.mockito.invocation.InvocationOnMock;
|
import org.mockito.invocation.InvocationOnMock;
|
||||||
import org.mockito.stubbing.Answer;
|
import org.mockito.stubbing.Answer;
|
||||||
|
|
||||||
import com.google.common.collect.Lists;
|
|
||||||
import com.google.common.util.concurrent.Futures;
|
import com.google.common.util.concurrent.Futures;
|
||||||
import com.google.common.util.concurrent.ListenableFuture;
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
|
|
||||||
|
@ -74,6 +74,8 @@ public class TestQJMWithFaults {
|
|||||||
private static final int SEGMENTS_PER_WRITER = 2;
|
private static final int SEGMENTS_PER_WRITER = 2;
|
||||||
|
|
||||||
private static Configuration conf = new Configuration();
|
private static Configuration conf = new Configuration();
|
||||||
|
|
||||||
|
|
||||||
static {
|
static {
|
||||||
// Don't retry connections - it just slows down the tests.
|
// Don't retry connections - it just slows down the tests.
|
||||||
conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
|
conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
|
||||||
|
@ -799,6 +799,13 @@ public void testPurgeLogs() throws Exception {
|
|||||||
"3");
|
"3");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testToString() throws Exception {
|
||||||
|
GenericTestUtils.assertMatches(
|
||||||
|
qjm.toString(),
|
||||||
|
"QJM to \\[127.0.0.1:\\d+, 127.0.0.1:\\d+, 127.0.0.1:\\d+\\]");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
private QuorumJournalManager createSpyingQJM()
|
private QuorumJournalManager createSpyingQJM()
|
||||||
throws IOException, URISyntaxException {
|
throws IOException, URISyntaxException {
|
||||||
|
@ -177,7 +177,7 @@ public void testNewEpochAtBeginningOfSegment() throws Exception {
|
|||||||
journal.journal(makeRI(2), 1, 1, 2,
|
journal.journal(makeRI(2), 1, 1, 2,
|
||||||
QJMTestUtil.createTxnData(1, 2));
|
QJMTestUtil.createTxnData(1, 2));
|
||||||
journal.finalizeLogSegment(makeRI(3), 1, 2);
|
journal.finalizeLogSegment(makeRI(3), 1, 2);
|
||||||
journal.startLogSegment(makeRI(3), 3);
|
journal.startLogSegment(makeRI(4), 3);
|
||||||
NewEpochResponseProto resp = journal.newEpoch(FAKE_NSINFO, 2);
|
NewEpochResponseProto resp = journal.newEpoch(FAKE_NSINFO, 2);
|
||||||
assertEquals(1, resp.getLastSegmentTxId());
|
assertEquals(1, resp.getLastSegmentTxId());
|
||||||
}
|
}
|
||||||
|
@ -240,7 +240,6 @@ public void testAcceptRecoveryBehavior() throws Exception {
|
|||||||
assertTrue(prep.hasSegmentState());
|
assertTrue(prep.hasSegmentState());
|
||||||
|
|
||||||
// accept() should save the accepted value in persistent storage
|
// accept() should save the accepted value in persistent storage
|
||||||
// TODO: should be able to accept without a URL here
|
|
||||||
ch.acceptRecovery(prep.getSegmentState(), new URL("file:///dev/null")).get();
|
ch.acceptRecovery(prep.getSegmentState(), new URL("file:///dev/null")).get();
|
||||||
|
|
||||||
// So another prepare() call from a new epoch would return this value
|
// So another prepare() call from a new epoch would return this value
|
||||||
@ -326,12 +325,7 @@ private void doPerfTest(int editsSize, int numEdits) throws Exception {
|
|||||||
" bytes in " + time + "ms");
|
" bytes in " + time + "ms");
|
||||||
float avgRtt = (float)time/(float)numEdits;
|
float avgRtt = (float)time/(float)numEdits;
|
||||||
long throughput = ((long)numEdits * editsSize * 1000L)/time;
|
long throughput = ((long)numEdits * editsSize * 1000L)/time;
|
||||||
System.err.println("Time per batch: " + avgRtt);
|
System.err.println("Time per batch: " + avgRtt + "ms");
|
||||||
System.err.println("Throughput: " + throughput + " bytes/sec");
|
System.err.println("Throughput: " + throughput + " bytes/sec");
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO:
|
|
||||||
// - add test that checks formatting behavior
|
|
||||||
// - add test that checks rejects newEpoch if nsinfo doesn't match
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user