HDFS-2943. Expose last checkpoint time and transaction stats as JMX metrics. Contributed by Aaron T. Myers.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1243822 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
0515b3322f
commit
a87328dfab
@ -221,6 +221,9 @@ Release 0.23.2 - UNRELEASED
|
|||||||
|
|
||||||
NEW FEATURES
|
NEW FEATURES
|
||||||
|
|
||||||
|
HDFS-2943. Expose last checkpoint time and transaction stats as JMX
|
||||||
|
metrics. (atm)
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
|
|
||||||
HDFS-2931. Switch DataNode's BlockVolumeChoosingPolicy to private-audience.
|
HDFS-2931. Switch DataNode's BlockVolumeChoosingPolicy to private-audience.
|
||||||
|
@ -711,7 +711,7 @@ private void loadFSImage(File curFile, MD5Hash expectedMd5,
|
|||||||
long txId = loader.getLoadedImageTxId();
|
long txId = loader.getLoadedImageTxId();
|
||||||
LOG.info("Loaded image for txid " + txId + " from " + curFile);
|
LOG.info("Loaded image for txid " + txId + " from " + curFile);
|
||||||
lastAppliedTxId = txId;
|
lastAppliedTxId = txId;
|
||||||
storage.setMostRecentCheckpointTxId(txId);
|
storage.setMostRecentCheckpointInfo(txId, curFile.lastModified());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -728,7 +728,7 @@ void saveFSImage(SaveNamespaceContext context, StorageDirectory sd)
|
|||||||
saver.save(newFile, compression);
|
saver.save(newFile, compression);
|
||||||
|
|
||||||
MD5FileUtils.saveMD5File(dstFile, saver.getSavedDigest());
|
MD5FileUtils.saveMD5File(dstFile, saver.getSavedDigest());
|
||||||
storage.setMostRecentCheckpointTxId(txid);
|
storage.setMostRecentCheckpointInfo(txid, Util.now());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -1032,7 +1032,7 @@ synchronized void saveDigestAndRenameCheckpointImage(
|
|||||||
// advertise it as such to other checkpointers
|
// advertise it as such to other checkpointers
|
||||||
// from now on
|
// from now on
|
||||||
if (txid > storage.getMostRecentCheckpointTxId()) {
|
if (txid > storage.getMostRecentCheckpointTxId()) {
|
||||||
storage.setMostRecentCheckpointTxId(txid);
|
storage.setMostRecentCheckpointInfo(txid, Util.now());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2681,6 +2681,31 @@ public int getExpiredHeartbeats() {
|
|||||||
return datanodeStatistics.getExpiredHeartbeats();
|
return datanodeStatistics.getExpiredHeartbeats();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Metric({"TransactionsSinceLastCheckpoint",
|
||||||
|
"Number of transactions since last checkpoint"})
|
||||||
|
public long getTransactionsSinceLastCheckpoint() {
|
||||||
|
return getEditLog().getLastWrittenTxId() -
|
||||||
|
getFSImage().getStorage().getMostRecentCheckpointTxId();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Metric({"TransactionsSinceLastLogRoll",
|
||||||
|
"Number of transactions since last edit log roll"})
|
||||||
|
public long getTransactionsSinceLastLogRoll() {
|
||||||
|
return (getEditLog().getLastWrittenTxId() -
|
||||||
|
getEditLog().getCurSegmentTxId()) + 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Metric({"LastWrittenTransactionId", "Transaction ID written to the edit log"})
|
||||||
|
public long getLastWrittenTransactionId() {
|
||||||
|
return getEditLog().getLastWrittenTxId();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Metric({"LastCheckpointTime",
|
||||||
|
"Time in milliseconds since the epoch of the last checkpoint"})
|
||||||
|
public long getLastCheckpointTime() {
|
||||||
|
return getFSImage().getStorage().getMostRecentCheckpointTime();
|
||||||
|
}
|
||||||
|
|
||||||
/** @see ClientProtocol#getStats() */
|
/** @see ClientProtocol#getStats() */
|
||||||
long[] getStats() {
|
long[] getStats() {
|
||||||
final long[] stats = datanodeStatistics.getStats();
|
final long[] stats = datanodeStatistics.getStats();
|
||||||
|
@ -128,6 +128,11 @@ public boolean isOfType(StorageDirType type) {
|
|||||||
*/
|
*/
|
||||||
protected long mostRecentCheckpointTxId = HdfsConstants.INVALID_TXID;
|
protected long mostRecentCheckpointTxId = HdfsConstants.INVALID_TXID;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Time of the last checkpoint, in milliseconds since the epoch.
|
||||||
|
*/
|
||||||
|
private long mostRecentCheckpointTime = 0;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* list of failed (and thus removed) storages
|
* list of failed (and thus removed) storages
|
||||||
*/
|
*/
|
||||||
@ -440,19 +445,30 @@ void writeTransactionIdFile(StorageDirectory sd, long txid) throws IOException {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set the transaction ID of the last checkpoint
|
* Set the transaction ID and time of the last checkpoint
|
||||||
|
*
|
||||||
|
* @param txid transaction id of the last checkpoint
|
||||||
|
* @param time time of the last checkpoint, in millis since the epoch
|
||||||
*/
|
*/
|
||||||
void setMostRecentCheckpointTxId(long txid) {
|
void setMostRecentCheckpointInfo(long txid, long time) {
|
||||||
this.mostRecentCheckpointTxId = txid;
|
this.mostRecentCheckpointTxId = txid;
|
||||||
|
this.mostRecentCheckpointTime = time;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return the transaction ID of the last checkpoint.
|
* @return the transaction ID of the last checkpoint.
|
||||||
*/
|
*/
|
||||||
long getMostRecentCheckpointTxId() {
|
long getMostRecentCheckpointTxId() {
|
||||||
return mostRecentCheckpointTxId;
|
return mostRecentCheckpointTxId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the time of the most recent checkpoint in millis since the epoch.
|
||||||
|
*/
|
||||||
|
long getMostRecentCheckpointTime() {
|
||||||
|
return mostRecentCheckpointTime;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Write a small file in all available storage directories that
|
* Write a small file in all available storage directories that
|
||||||
* indicates that the namespace has reached some given transaction ID.
|
* indicates that the namespace has reached some given transaction ID.
|
||||||
|
@ -20,13 +20,12 @@
|
|||||||
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
|
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
|
||||||
import static org.apache.hadoop.test.MetricsAsserts.assertGauge;
|
import static org.apache.hadoop.test.MetricsAsserts.assertGauge;
|
||||||
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
|
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
|
||||||
|
import static org.junit.Assert.*;
|
||||||
|
|
||||||
import java.io.DataInputStream;
|
import java.io.DataInputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
|
|
||||||
import junit.framework.TestCase;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.commons.logging.impl.Log4JLogger;
|
import org.apache.commons.logging.impl.Log4JLogger;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
@ -39,17 +38,21 @@
|
|||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
||||||
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
||||||
import org.apache.hadoop.test.MetricsAsserts;
|
import org.apache.hadoop.test.MetricsAsserts;
|
||||||
import org.apache.log4j.Level;
|
import org.apache.log4j.Level;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test for metrics published by the Namenode
|
* Test for metrics published by the Namenode
|
||||||
*/
|
*/
|
||||||
public class TestNameNodeMetrics extends TestCase {
|
public class TestNameNodeMetrics {
|
||||||
private static final Configuration CONF = new HdfsConfiguration();
|
private static final Configuration CONF = new HdfsConfiguration();
|
||||||
private static final int DFS_REPLICATION_INTERVAL = 1;
|
private static final int DFS_REPLICATION_INTERVAL = 1;
|
||||||
private static final Path TEST_ROOT_DIR_PATH =
|
private static final Path TEST_ROOT_DIR_PATH =
|
||||||
@ -81,8 +84,8 @@ private static Path getTestPath(String fileName) {
|
|||||||
return new Path(TEST_ROOT_DIR_PATH, fileName);
|
return new Path(TEST_ROOT_DIR_PATH, fileName);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Before
|
||||||
protected void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(DATANODE_COUNT).build();
|
cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(DATANODE_COUNT).build();
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
namesystem = cluster.getNamesystem();
|
namesystem = cluster.getNamesystem();
|
||||||
@ -90,8 +93,8 @@ protected void setUp() throws Exception {
|
|||||||
fs = (DistributedFileSystem) cluster.getFileSystem();
|
fs = (DistributedFileSystem) cluster.getFileSystem();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@After
|
||||||
protected void tearDown() throws Exception {
|
public void tearDown() throws Exception {
|
||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -115,6 +118,7 @@ private void readFile(FileSystem fileSys,Path name) throws IOException {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/** Test metrics associated with addition of a file */
|
/** Test metrics associated with addition of a file */
|
||||||
|
@Test
|
||||||
public void testFileAdd() throws Exception {
|
public void testFileAdd() throws Exception {
|
||||||
// Add files with 100 blocks
|
// Add files with 100 blocks
|
||||||
final Path file = getTestPath("testFileAdd");
|
final Path file = getTestPath("testFileAdd");
|
||||||
@ -159,6 +163,7 @@ public void testFileAdd() throws Exception {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/** Corrupt a block and ensure metrics reflects it */
|
/** Corrupt a block and ensure metrics reflects it */
|
||||||
|
@Test
|
||||||
public void testCorruptBlock() throws Exception {
|
public void testCorruptBlock() throws Exception {
|
||||||
// Create a file with single block with two replicas
|
// Create a file with single block with two replicas
|
||||||
final Path file = getTestPath("testCorruptBlock");
|
final Path file = getTestPath("testCorruptBlock");
|
||||||
@ -184,6 +189,7 @@ public void testCorruptBlock() throws Exception {
|
|||||||
/** Create excess blocks by reducing the replication factor for
|
/** Create excess blocks by reducing the replication factor for
|
||||||
* for a file and ensure metrics reflects it
|
* for a file and ensure metrics reflects it
|
||||||
*/
|
*/
|
||||||
|
@Test
|
||||||
public void testExcessBlocks() throws Exception {
|
public void testExcessBlocks() throws Exception {
|
||||||
Path file = getTestPath("testExcessBlocks");
|
Path file = getTestPath("testExcessBlocks");
|
||||||
createFile(file, 100, (short)2);
|
createFile(file, 100, (short)2);
|
||||||
@ -196,6 +202,7 @@ public void testExcessBlocks() throws Exception {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/** Test to ensure metrics reflects missing blocks */
|
/** Test to ensure metrics reflects missing blocks */
|
||||||
|
@Test
|
||||||
public void testMissingBlock() throws Exception {
|
public void testMissingBlock() throws Exception {
|
||||||
// Create a file with single block with two replicas
|
// Create a file with single block with two replicas
|
||||||
Path file = getTestPath("testMissingBlocks");
|
Path file = getTestPath("testMissingBlocks");
|
||||||
@ -220,6 +227,7 @@ private void waitForDeletion() throws InterruptedException {
|
|||||||
Thread.sleep(DFS_REPLICATION_INTERVAL * (DATANODE_COUNT + 1) * 1000);
|
Thread.sleep(DFS_REPLICATION_INTERVAL * (DATANODE_COUNT + 1) * 1000);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
public void testRenameMetrics() throws Exception {
|
public void testRenameMetrics() throws Exception {
|
||||||
Path src = getTestPath("src");
|
Path src = getTestPath("src");
|
||||||
createFile(src, 100, (short)1);
|
createFile(src, 100, (short)1);
|
||||||
@ -244,6 +252,7 @@ public void testRenameMetrics() throws Exception {
|
|||||||
*
|
*
|
||||||
* @throws IOException in case of an error
|
* @throws IOException in case of an error
|
||||||
*/
|
*/
|
||||||
|
@Test
|
||||||
public void testGetBlockLocationMetric() throws Exception {
|
public void testGetBlockLocationMetric() throws Exception {
|
||||||
Path file1_Path = new Path(TEST_ROOT_DIR_PATH, "file1.dat");
|
Path file1_Path = new Path(TEST_ROOT_DIR_PATH, "file1.dat");
|
||||||
|
|
||||||
@ -272,4 +281,46 @@ public void testGetBlockLocationMetric() throws Exception{
|
|||||||
updateMetrics();
|
updateMetrics();
|
||||||
assertCounter("GetBlockLocations", 3L, getMetrics(NN_METRICS));
|
assertCounter("GetBlockLocations", 3L, getMetrics(NN_METRICS));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test NN checkpoint and transaction-related metrics.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testTransactionAndCheckpointMetrics() throws Exception {
|
||||||
|
long lastCkptTime = MetricsAsserts.getLongGauge("LastCheckpointTime",
|
||||||
|
getMetrics(NS_METRICS));
|
||||||
|
|
||||||
|
assertGauge("LastCheckpointTime", lastCkptTime, getMetrics(NS_METRICS));
|
||||||
|
assertGauge("LastWrittenTransactionId", 1L, getMetrics(NS_METRICS));
|
||||||
|
assertGauge("TransactionsSinceLastCheckpoint", 1L, getMetrics(NS_METRICS));
|
||||||
|
assertGauge("TransactionsSinceLastLogRoll", 1L, getMetrics(NS_METRICS));
|
||||||
|
|
||||||
|
fs.mkdirs(new Path(TEST_ROOT_DIR_PATH, "/tmp"));
|
||||||
|
updateMetrics();
|
||||||
|
|
||||||
|
assertGauge("LastCheckpointTime", lastCkptTime, getMetrics(NS_METRICS));
|
||||||
|
assertGauge("LastWrittenTransactionId", 2L, getMetrics(NS_METRICS));
|
||||||
|
assertGauge("TransactionsSinceLastCheckpoint", 2L, getMetrics(NS_METRICS));
|
||||||
|
assertGauge("TransactionsSinceLastLogRoll", 2L, getMetrics(NS_METRICS));
|
||||||
|
|
||||||
|
cluster.getNameNodeRpc().rollEditLog();
|
||||||
|
updateMetrics();
|
||||||
|
|
||||||
|
assertGauge("LastCheckpointTime", lastCkptTime, getMetrics(NS_METRICS));
|
||||||
|
assertGauge("LastWrittenTransactionId", 4L, getMetrics(NS_METRICS));
|
||||||
|
assertGauge("TransactionsSinceLastCheckpoint", 4L, getMetrics(NS_METRICS));
|
||||||
|
assertGauge("TransactionsSinceLastLogRoll", 1L, getMetrics(NS_METRICS));
|
||||||
|
|
||||||
|
cluster.getNameNodeRpc().setSafeMode(SafeModeAction.SAFEMODE_ENTER);
|
||||||
|
cluster.getNameNodeRpc().saveNamespace();
|
||||||
|
cluster.getNameNodeRpc().setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
|
||||||
|
updateMetrics();
|
||||||
|
|
||||||
|
long newLastCkptTime = MetricsAsserts.getLongGauge("LastCheckpointTime",
|
||||||
|
getMetrics(NS_METRICS));
|
||||||
|
assertTrue(lastCkptTime < newLastCkptTime);
|
||||||
|
assertGauge("LastWrittenTransactionId", 6L, getMetrics(NS_METRICS));
|
||||||
|
assertGauge("TransactionsSinceLastCheckpoint", 1L, getMetrics(NS_METRICS));
|
||||||
|
assertGauge("TransactionsSinceLastLogRoll", 1L, getMetrics(NS_METRICS));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user