HDFS-8929. Add a metric to expose the timestamp of the last journal (Contributed by surendra singh lilhore)
This commit is contained in:
parent
a153b9601a
commit
94cf7ab9d2
@ -275,6 +275,7 @@ The server-side metrics for a journal from the JournalNode's perspective. Each m
|
|||||||
| `CurrentLagTxns` | The number of transactions that this JournalNode is lagging |
|
| `CurrentLagTxns` | The number of transactions that this JournalNode is lagging |
|
||||||
| `LastWrittenTxId` | The highest transaction id stored on this JournalNode |
|
| `LastWrittenTxId` | The highest transaction id stored on this JournalNode |
|
||||||
| `LastPromisedEpoch` | The last epoch number which this node has promised not to accept any lower epoch, or 0 if no promises have been made |
|
| `LastPromisedEpoch` | The last epoch number which this node has promised not to accept any lower epoch, or 0 if no promises have been made |
|
||||||
|
| `LastJournalTimestamp` | The timestamp of last successfully written transaction |
|
||||||
|
|
||||||
datanode
|
datanode
|
||||||
--------
|
--------
|
||||||
|
@ -921,6 +921,9 @@ Release 2.8.0 - UNRELEASED
|
|||||||
|
|
||||||
HDFS-8862. BlockManager#excessReplicateMap should use a HashMap. (yliu)
|
HDFS-8862. BlockManager#excessReplicateMap should use a HashMap. (yliu)
|
||||||
|
|
||||||
|
HDFS-8929. Add a metric to expose the timestamp of the last journal
|
||||||
|
(surendra singh lilhore via vinayakumarb)
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
||||||
HDFS-7501. TransactionsSinceLastCheckpoint can be negative on SBNs.
|
HDFS-7501. TransactionsSinceLastCheckpoint can be negative on SBNs.
|
||||||
|
@ -63,6 +63,7 @@
|
|||||||
import org.apache.hadoop.security.SecurityUtil;
|
import org.apache.hadoop.security.SecurityUtil;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.util.StopWatch;
|
import org.apache.hadoop.util.StopWatch;
|
||||||
|
import org.apache.hadoop.util.Time;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.Charsets;
|
import com.google.common.base.Charsets;
|
||||||
@ -132,6 +133,8 @@ public class Journal implements Closeable {
|
|||||||
|
|
||||||
private final JournalMetrics metrics;
|
private final JournalMetrics metrics;
|
||||||
|
|
||||||
|
private long lastJournalTimestamp = 0;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Time threshold for sync calls, beyond which a warning should be logged to the console.
|
* Time threshold for sync calls, beyond which a warning should be logged to the console.
|
||||||
*/
|
*/
|
||||||
@ -253,7 +256,11 @@ synchronized public long getLastWriterEpoch() throws IOException {
|
|||||||
synchronized long getCommittedTxnIdForTests() throws IOException {
|
synchronized long getCommittedTxnIdForTests() throws IOException {
|
||||||
return committedTxnId.get();
|
return committedTxnId.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
synchronized long getLastJournalTimestamp() {
|
||||||
|
return lastJournalTimestamp;
|
||||||
|
}
|
||||||
|
|
||||||
synchronized long getCurrentLagTxns() throws IOException {
|
synchronized long getCurrentLagTxns() throws IOException {
|
||||||
long committed = committedTxnId.get();
|
long committed = committedTxnId.get();
|
||||||
if (committed == 0) {
|
if (committed == 0) {
|
||||||
@ -411,6 +418,7 @@ synchronized void journal(RequestInfo reqInfo,
|
|||||||
|
|
||||||
updateHighestWrittenTxId(lastTxnId);
|
updateHighestWrittenTxId(lastTxnId);
|
||||||
nextTxId = lastTxnId + 1;
|
nextTxId = lastTxnId + 1;
|
||||||
|
lastJournalTimestamp = Time.now();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void heartbeat(RequestInfo reqInfo) throws IOException {
|
public void heartbeat(RequestInfo reqInfo) throws IOException {
|
||||||
|
@ -109,7 +109,12 @@ public long getCurrentLagTxns() {
|
|||||||
return -1L;
|
return -1L;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Metric("The timestamp of last successfully written transaction")
|
||||||
|
public long getLastJournalTimestamp() {
|
||||||
|
return journal.getLastJournalTimestamp();
|
||||||
|
}
|
||||||
|
|
||||||
void addSync(long us) {
|
void addSync(long us) {
|
||||||
for (MutableQuantiles q : syncsQuantiles) {
|
for (MutableQuantiles q : syncsQuantiles) {
|
||||||
q.add(us);
|
q.add(us);
|
||||||
|
@ -106,7 +106,9 @@ public void testJournal() throws Exception {
|
|||||||
MetricsAsserts.assertCounter("BatchesWritten", 0L, metrics);
|
MetricsAsserts.assertCounter("BatchesWritten", 0L, metrics);
|
||||||
MetricsAsserts.assertCounter("BatchesWrittenWhileLagging", 0L, metrics);
|
MetricsAsserts.assertCounter("BatchesWrittenWhileLagging", 0L, metrics);
|
||||||
MetricsAsserts.assertGauge("CurrentLagTxns", 0L, metrics);
|
MetricsAsserts.assertGauge("CurrentLagTxns", 0L, metrics);
|
||||||
|
MetricsAsserts.assertGauge("LastJournalTimestamp", 0L, metrics);
|
||||||
|
|
||||||
|
long beginTimestamp = System.currentTimeMillis();
|
||||||
IPCLoggerChannel ch = new IPCLoggerChannel(
|
IPCLoggerChannel ch = new IPCLoggerChannel(
|
||||||
conf, FAKE_NSINFO, journalId, jn.getBoundIpcAddress());
|
conf, FAKE_NSINFO, journalId, jn.getBoundIpcAddress());
|
||||||
ch.newEpoch(1).get();
|
ch.newEpoch(1).get();
|
||||||
@ -119,6 +121,10 @@ public void testJournal() throws Exception {
|
|||||||
MetricsAsserts.assertCounter("BatchesWritten", 1L, metrics);
|
MetricsAsserts.assertCounter("BatchesWritten", 1L, metrics);
|
||||||
MetricsAsserts.assertCounter("BatchesWrittenWhileLagging", 0L, metrics);
|
MetricsAsserts.assertCounter("BatchesWrittenWhileLagging", 0L, metrics);
|
||||||
MetricsAsserts.assertGauge("CurrentLagTxns", 0L, metrics);
|
MetricsAsserts.assertGauge("CurrentLagTxns", 0L, metrics);
|
||||||
|
long lastJournalTimestamp = MetricsAsserts.getLongGauge(
|
||||||
|
"LastJournalTimestamp", metrics);
|
||||||
|
assertTrue(lastJournalTimestamp > beginTimestamp);
|
||||||
|
beginTimestamp = lastJournalTimestamp;
|
||||||
|
|
||||||
ch.setCommittedTxId(100L);
|
ch.setCommittedTxId(100L);
|
||||||
ch.sendEdits(1L, 2, 1, "goodbye".getBytes(Charsets.UTF_8)).get();
|
ch.sendEdits(1L, 2, 1, "goodbye".getBytes(Charsets.UTF_8)).get();
|
||||||
@ -128,6 +134,9 @@ public void testJournal() throws Exception {
|
|||||||
MetricsAsserts.assertCounter("BatchesWritten", 2L, metrics);
|
MetricsAsserts.assertCounter("BatchesWritten", 2L, metrics);
|
||||||
MetricsAsserts.assertCounter("BatchesWrittenWhileLagging", 1L, metrics);
|
MetricsAsserts.assertCounter("BatchesWrittenWhileLagging", 1L, metrics);
|
||||||
MetricsAsserts.assertGauge("CurrentLagTxns", 98L, metrics);
|
MetricsAsserts.assertGauge("CurrentLagTxns", 98L, metrics);
|
||||||
|
lastJournalTimestamp = MetricsAsserts.getLongGauge(
|
||||||
|
"LastJournalTimestamp", metrics);
|
||||||
|
assertTrue(lastJournalTimestamp > beginTimestamp);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user