HDFS-3901. QJM: send 'heartbeat' messages to JNs even when they are out-of-sync. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-3077@1383137 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2012-09-10 22:30:52 +00:00
parent 60c20e559b
commit 959afc0fd3
12 changed files with 215 additions and 21 deletions

View File

@ -58,3 +58,5 @@ HDFS-3898. QJM: enable TCP_NODELAY for IPC (todd)
HDFS-3885. QJM: optimize log sync when JN is lagging behind (todd)
HDFS-3900. QJM: avoid validating log segments on log rolls (todd)
HDFS-3901. QJM: send 'heartbeat' messages to JNs even when they are out-of-sync (todd)

View File

@ -26,6 +26,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
@ -52,6 +53,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@ -92,6 +94,19 @@ public class IPCLoggerChannel implements AsyncLogger {
* The highest txid that has been successfully logged on the remote JN.
*/
private long highestAckedTxId = 0;
/**
* Nanotime of the last time we successfully journaled some edits
* to the remote node.
*/
private long lastAckNanos = 0;
/**
* Nanotime of the last time that committedTxId was update. Used
* to calculate the lag in terms of time, rather than just a number
* of txns.
*/
private long lastCommitNanos = 0;
/**
* The maximum number of bytes that can be pending in the queue.
@ -109,6 +124,13 @@ public class IPCLoggerChannel implements AsyncLogger {
*/
private boolean outOfSync = false;
/**
* Stopwatch which starts counting on each heartbeat that is sent
*/
private Stopwatch lastHeartbeatStopwatch = new Stopwatch();
private static final long HEARTBEAT_INTERVAL_MILLIS = 1000;
static final Factory FACTORY = new AsyncLogger.Factory() {
@Override
public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo,
@ -145,6 +167,7 @@ public synchronized void setCommittedTxId(long txid) {
"Trying to move committed txid backwards in client " +
"old: %s new: %s", committedTxId, txid);
this.committedTxId = txid;
this.lastCommitNanos = System.nanoTime();
}
@Override
@ -295,6 +318,11 @@ public ListenableFuture<Void> sendEdits(
} catch (LoggerTooFarBehindException e) {
return Futures.immediateFailedFuture(e);
}
// When this batch is acked, we use its submission time in order
// to calculate how far we are lagging.
final long submitNanos = System.nanoTime();
ListenableFuture<Void> ret = null;
try {
ret = executor.submit(new Callable<Void>() {
@ -318,6 +346,7 @@ public Void call() throws IOException {
}
synchronized (IPCLoggerChannel.this) {
highestAckedTxId = firstTxnId + numTxns - 1;
lastAckNanos = submitNanos;
}
return null;
}
@ -347,15 +376,40 @@ public void onSuccess(Void t) {
return ret;
}
private synchronized void throwIfOutOfSync() throws JournalOutOfSyncException {
if (outOfSync) {
// TODO: send a "heartbeat" here so that the remote node knows the newest
// committed txid, for metrics purposes
private void throwIfOutOfSync()
throws JournalOutOfSyncException, IOException {
if (isOutOfSync()) {
// Even if we're out of sync, it's useful to send an RPC
// to the remote node in order to update its lag metrics, etc.
heartbeatIfNecessary();
throw new JournalOutOfSyncException(
"Journal disabled until next roll");
}
}
/**
* When we've entered an out-of-sync state, it's still useful to periodically
* send an empty RPC to the server, such that it has the up to date
* committedTxId. This acts as a sanity check during recovery, and also allows
* that node's metrics to be up-to-date about its lag.
*
* In the future, this method may also be used in order to check that the
* current node is still the current writer, even if no edits are being
* written.
*/
private void heartbeatIfNecessary() throws IOException {
if (lastHeartbeatStopwatch.elapsedMillis() > HEARTBEAT_INTERVAL_MILLIS ||
!lastHeartbeatStopwatch.isRunning()) {
try {
getProxy().heartbeat(createReqInfo());
} finally {
// Don't send heartbeats more often than the configured interval,
// even if they fail.
lastHeartbeatStopwatch.reset().start();
}
}
}
private synchronized void reserveQueueSpace(int size)
throws LoggerTooFarBehindException {
Preconditions.checkArgument(size >= 0);
@ -479,13 +533,27 @@ public String toString() {
@Override
public synchronized void appendHtmlReport(StringBuilder sb) {
sb.append("Written txid ").append(highestAckedTxId);
long behind = committedTxId - highestAckedTxId;
assert behind >= 0;
long behind = getLagTxns();
if (behind > 0) {
sb.append(" (" + behind + " behind)");
if (lastAckNanos != 0) {
long lagMillis = getLagTimeMillis();
sb.append(" (" + behind + " txns/" + lagMillis + "ms behind)");
} else {
sb.append(" (never written");
}
}
if (outOfSync) {
sb.append(" (will re-join on next segment)");
sb.append(" (will try to re-sync on next segment)");
}
}
private long getLagTxns() {
return Math.max(committedTxId - highestAckedTxId, 0);
}
private long getLagTimeMillis() {
return TimeUnit.MILLISECONDS.convert(
Math.max(lastCommitNanos - lastAckNanos, 0),
TimeUnit.NANOSECONDS);
}
}

View File

@ -77,6 +77,15 @@ public void journal(RequestInfo reqInfo,
int numTxns,
byte[] records) throws IOException;
/**
* Heartbeat.
* This is a no-op on the server, except that it verifies that the
* caller is in fact still the active writer, and provides up-to-date
* information on the most recently committed txid.
*/
public void heartbeat(RequestInfo reqInfo) throws IOException;
/**
* Start writing to a new log segment on the JournalNode.
* Before calling this, one should finalize the previous segment

View File

@ -30,6 +30,8 @@
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.HeartbeatRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.HeartbeatResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.JournalIdProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.JournalRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.JournalResponseProto;
@ -118,6 +120,18 @@ public JournalResponseProto journal(RpcController unused,
return JournalResponseProto.newBuilder().build();
}
/** @see JournalProtocol#heartbeat */
@Override
public HeartbeatResponseProto heartbeat(RpcController controller,
HeartbeatRequestProto req) throws ServiceException {
try {
impl.heartbeat(convert(req.getReqInfo()));
} catch (IOException e) {
throw new ServiceException(e);
}
return HeartbeatResponseProto.getDefaultInstance();
}
/** @see JournalProtocol#startLogSegment */
@Override
public StartLogSegmentResponseProto startLogSegment(RpcController controller,

View File

@ -32,6 +32,7 @@
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.HeartbeatRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.JournalIdProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.JournalRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochRequestProto;
@ -141,6 +142,17 @@ public void journal(RequestInfo reqInfo,
throw ProtobufHelper.getRemoteException(e);
}
}
@Override
public void heartbeat(RequestInfo reqInfo) throws IOException {
try {
rpcProxy.heartbeat(NULL_CONTROLLER, HeartbeatRequestProto.newBuilder()
.setReqInfo(convert(reqInfo))
.build());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
private QJournalProtocolProtos.RequestInfoProto convert(
RequestInfo reqInfo) {

View File

@ -75,6 +75,7 @@ class Journal implements Closeable {
private EditLogOutputStream curSegment;
private long curSegmentTxId = HdfsConstants.INVALID_TXID;
private long nextTxId = HdfsConstants.INVALID_TXID;
private long highestWrittenTxId = 0;
private final String journalId;
@ -123,6 +124,11 @@ class Journal implements Closeable {
this.fjm = storage.getJournalManager();
this.metrics = JournalMetrics.create(this);
EditLogFile latest = scanStorageForLatestEdits();
if (latest != null) {
highestWrittenTxId = latest.getLastTxId();
}
}
/**
@ -224,6 +230,19 @@ synchronized long getCommittedTxnIdForTests() throws IOException {
return committedTxnId.get();
}
synchronized long getCurrentLagTxns() throws IOException {
long committed = committedTxnId.get();
if (committed == 0) {
return 0;
}
return Math.max(committed - highestWrittenTxId, 0L);
}
synchronized long getHighestWrittenTxId() {
return highestWrittenTxId;
}
@VisibleForTesting
JournalMetrics getMetricsForTests() {
return metrics;
@ -329,19 +348,20 @@ synchronized void journal(RequestInfo reqInfo,
// This batch of edits has already been committed on a quorum of other
// nodes. So, we are in "catch up" mode. This gets its own metric.
metrics.batchesWrittenWhileLagging.incr(1);
metrics.currentLagTxns.set(committedTxnId.get() - lastTxnId);
} else {
metrics.currentLagTxns.set(0L);
}
metrics.batchesWritten.incr(1);
metrics.bytesWritten.incr(records.length);
metrics.txnsWritten.incr(numTxns);
metrics.lastWrittenTxId.set(lastTxnId);
nextTxId += numTxns;
highestWrittenTxId = lastTxnId;
nextTxId = lastTxnId + 1;
}
public void heartbeat(RequestInfo reqInfo) throws IOException {
checkRequest(reqInfo);
}
/**
* Ensure that the given request is coming from the correct writer and in-order.
* @param reqInfo the request info
@ -690,6 +710,10 @@ public synchronized void acceptRecovery(RequestInfo reqInfo,
if (currentSegment == null) {
LOG.info("Synchronizing log " + TextFormat.shortDebugString(segment) +
": no current segment in place");
// Update the highest txid for lag metrics
highestWrittenTxId = Math.max(segment.getEndTxId(),
highestWrittenTxId);
} else {
LOG.info("Synchronizing log " + TextFormat.shortDebugString(segment) +
": old segment " + TextFormat.shortDebugString(currentSegment) +
@ -708,8 +732,15 @@ public synchronized void acceptRecovery(RequestInfo reqInfo,
": would discard already-committed txn " +
committedTxnId.get());
}
// If we're shortening the log, update our highest txid
// used for lag metrics.
if (txnRange(currentSegment).contains(highestWrittenTxId)) {
highestWrittenTxId = segment.getEndTxId();
}
}
syncLog(reqInfo, segment, fromUrl);
} else {
LOG.info("Skipping download of log " +
TextFormat.shortDebugString(segment) +

View File

@ -51,12 +51,6 @@ class JournalMetrics {
MutableQuantiles[] syncsQuantiles;
@Metric("Transaction lag behind the most recent commit")
MutableGaugeLong currentLagTxns;
@Metric("Last written txid")
MutableGaugeLong lastWrittenTxId;
private final Journal journal;
JournalMetrics(Journal journal) {
@ -99,6 +93,20 @@ public long getLastPromisedEpoch() {
}
}
@Metric("The highest txid stored on this JN")
public long getLastWrittenTxId() {
return journal.getHighestWrittenTxId();
}
@Metric("Number of transactions that this JN is lagging")
public long getCurrentLagTxns() {
try {
return journal.getCurrentLagTxns();
} catch (IOException e) {
return -1L;
}
}
void addSync(long us) {
for (MutableQuantiles q : syncsQuantiles) {
q.add(us);

View File

@ -137,6 +137,12 @@ public void journal(RequestInfo reqInfo,
jn.getOrCreateJournal(reqInfo.getJournalId())
.journal(reqInfo, segmentTxId, firstTxnId, numTxns, records);
}
@Override
public void heartbeat(RequestInfo reqInfo) throws IOException {
jn.getOrCreateJournal(reqInfo.getJournalId())
.heartbeat(reqInfo);
}
@Override
public void startLogSegment(RequestInfo reqInfo, long txid)

View File

@ -71,6 +71,17 @@ message JournalRequestProto {
message JournalResponseProto {
}
/**
* heartbeat()
*/
message HeartbeatRequestProto {
required RequestInfoProto reqInfo = 1;
}
message HeartbeatResponseProto { // void response
}
/**
* startLogSegment()
*/
@ -207,6 +218,8 @@ service QJournalProtocolService {
rpc journal(JournalRequestProto) returns (JournalResponseProto);
rpc heartbeat(HeartbeatRequestProto) returns (HeartbeatResponseProto);
rpc startLogSegment(StartLogSegmentRequestProto)
returns (StartLogSegmentResponseProto);

View File

@ -22,8 +22,10 @@
import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
@ -193,6 +195,9 @@ public void testWebPageHasQjmInfo() throws Exception {
MiniDFSCluster.getBaseDirectory() + "/TestNNWithQJM/image");
conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY,
mjc.getQuorumJournalURI("myjournal").toString());
// Speed up the test
conf.setInt(
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 1);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(0)
@ -217,7 +222,18 @@ public void testWebPageHasQjmInfo() throws Exception {
contents = DFSTestUtil.urlGet(url);
System.out.println(contents);
assertTrue(contents.contains("(1 behind)"));
assertTrue(Pattern.compile("1 txns/\\d+ms behind").matcher(contents)
.find());
// Restart NN while JN0 is still down.
cluster.restartNameNode();
contents = DFSTestUtil.urlGet(url);
System.out.println(contents);
assertTrue(Pattern.compile("never written").matcher(contents)
.find());
} finally {
cluster.shutdown();
}

View File

@ -163,11 +163,14 @@ public void testStopSendingEditsWhenOutOfSync() throws Exception {
ee.getCause());
}
// It should have failed without even sending an RPC, since it was not sync.
// It should have failed without even sending the edits, since it was not sync.
Mockito.verify(mockProxy, Mockito.never()).journal(
Mockito.<RequestInfo>any(),
Mockito.eq(1L), Mockito.eq(2L),
Mockito.eq(1), Mockito.same(FAKE_DATA));
// It should have sent a heartbeat instead.
Mockito.verify(mockProxy).heartbeat(
Mockito.<RequestInfo>any());
// After a roll, sending new edits should not fail.
ch.startLogSegment(3L).get();

View File

@ -101,6 +101,7 @@ public void testJournal() throws Exception {
journal.getMetricsForTests().getName());
MetricsAsserts.assertCounter("BatchesWritten", 0L, metrics);
MetricsAsserts.assertCounter("BatchesWrittenWhileLagging", 0L, metrics);
MetricsAsserts.assertGauge("CurrentLagTxns", 0L, metrics);
IPCLoggerChannel ch = new IPCLoggerChannel(
conf, FAKE_NSINFO, JID, jn.getBoundIpcAddress());
@ -113,6 +114,17 @@ public void testJournal() throws Exception {
journal.getMetricsForTests().getName());
MetricsAsserts.assertCounter("BatchesWritten", 1L, metrics);
MetricsAsserts.assertCounter("BatchesWrittenWhileLagging", 0L, metrics);
MetricsAsserts.assertGauge("CurrentLagTxns", 0L, metrics);
ch.setCommittedTxId(100L);
ch.sendEdits(1L, 2, 1, "goodbye".getBytes(Charsets.UTF_8)).get();
metrics = MetricsAsserts.getMetrics(
journal.getMetricsForTests().getName());
MetricsAsserts.assertCounter("BatchesWritten", 2L, metrics);
MetricsAsserts.assertCounter("BatchesWrittenWhileLagging", 1L, metrics);
MetricsAsserts.assertGauge("CurrentLagTxns", 98L, metrics);
}