HDFS-9425. Expose number of blocks per volume as a metric (Contributed by Brahma Reddy Battula)
This commit is contained in:
parent
f313516731
commit
342c9572bf
@ -2024,6 +2024,9 @@ Release 2.8.0 - UNRELEASED
|
||||
HDFS-9768. Reuse ObjectMapper instance in HDFS to improve the performance.
|
||||
(Lin Yiqun via aajisaka)
|
||||
|
||||
HDFS-9425. Expose number of blocks per volume as a metric
|
||||
(Brahma Reddy Battula via vinayakumarb)
|
||||
|
||||
BUG FIXES
|
||||
|
||||
HDFS-7501. TransactionsSinceLastCheckpoint can be negative on SBNs.
|
||||
|
@ -30,6 +30,7 @@
|
||||
import java.io.Writer;
|
||||
import java.util.Iterator;
|
||||
import java.util.Scanner;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
@ -86,6 +87,7 @@ class BlockPoolSlice {
|
||||
private final boolean deleteDuplicateReplicas;
|
||||
private static final String REPLICA_CACHE_FILE = "replicas";
|
||||
private final long replicaCacheExpiry = 5*60*1000;
|
||||
private AtomicLong numOfBlocks = new AtomicLong();
|
||||
private final long cachedDfsUsedCheckTime;
|
||||
private final Timer timer;
|
||||
|
||||
@ -273,7 +275,11 @@ void saveDfsUsed() {
|
||||
*/
|
||||
File createTmpFile(Block b) throws IOException {
|
||||
File f = new File(tmpDir, b.getBlockName());
|
||||
return DatanodeUtil.createTmpFile(b, f);
|
||||
File tmpFile = DatanodeUtil.createTmpFile(b, f);
|
||||
// If any exception during creation, its expected that counter will not be
|
||||
// incremented, So no need to decrement
|
||||
incrNumBlocks();
|
||||
return tmpFile;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -282,7 +288,11 @@ File createTmpFile(Block b) throws IOException {
|
||||
*/
|
||||
File createRbwFile(Block b) throws IOException {
|
||||
File f = new File(rbwDir, b.getBlockName());
|
||||
return DatanodeUtil.createTmpFile(b, f);
|
||||
File rbwFile = DatanodeUtil.createTmpFile(b, f);
|
||||
// If any exception during creation, its expected that counter will not be
|
||||
// incremented, So no need to decrement
|
||||
incrNumBlocks();
|
||||
return rbwFile;
|
||||
}
|
||||
|
||||
File addFinalizedBlock(Block b, File f) throws IOException {
|
||||
@ -493,6 +503,9 @@ private void addReplicaToReplicasMap(Block block, ReplicaMap volumeMap,
|
||||
} else {
|
||||
lazyWriteReplicaMap.discardReplica(bpid, blockId, false);
|
||||
}
|
||||
if (oldReplica == null) {
|
||||
incrNumBlocks();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -825,4 +838,16 @@ private void saveReplicas(BlockListAsLongs blocksListToPersist) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void incrNumBlocks() {
|
||||
numOfBlocks.incrementAndGet();
|
||||
}
|
||||
|
||||
void decrNumBlocks() {
|
||||
numOfBlocks.decrementAndGet();
|
||||
}
|
||||
|
||||
public long getNumOfBlocks() {
|
||||
return numOfBlocks.get();
|
||||
}
|
||||
}
|
||||
|
@ -965,6 +965,11 @@ public ReplicaInfo moveBlockAcrossStorage(ExtendedBlock block,
|
||||
newReplicaInfo.setNumBytes(blockFiles[1].length());
|
||||
// Finalize the copied files
|
||||
newReplicaInfo = finalizeReplica(block.getBlockPoolId(), newReplicaInfo);
|
||||
synchronized (this) {
|
||||
// Increment numBlocks here as this block moved without knowing to BPS
|
||||
FsVolumeImpl volume = (FsVolumeImpl) newReplicaInfo.getVolume();
|
||||
volume.getBlockPoolSlice(block.getBlockPoolId()).incrNumBlocks();
|
||||
}
|
||||
|
||||
removeOldReplica(replicaInfo, newReplicaInfo, oldBlockFile, oldMetaFile,
|
||||
oldBlockFile.length(), oldMetaFile.length(), block.getBlockPoolId());
|
||||
@ -2599,6 +2604,7 @@ private static class VolumeInfo {
|
||||
final long reservedSpace; // size of space reserved for non-HDFS
|
||||
final long reservedSpaceForReplicas; // size of space reserved RBW or
|
||||
// re-replication
|
||||
final long numBlocks;
|
||||
|
||||
VolumeInfo(FsVolumeImpl v, long usedSpace, long freeSpace) {
|
||||
this.directory = v.toString();
|
||||
@ -2606,6 +2612,7 @@ private static class VolumeInfo {
|
||||
this.freeSpace = freeSpace;
|
||||
this.reservedSpace = v.getReserved();
|
||||
this.reservedSpaceForReplicas = v.getReservedForReplicas();
|
||||
this.numBlocks = v.getNumBlocks();
|
||||
}
|
||||
}
|
||||
|
||||
@ -2640,6 +2647,7 @@ public Map<String, Object> getVolumeInfoMap() {
|
||||
innerInfo.put("freeSpace", v.freeSpace);
|
||||
innerInfo.put("reservedSpace", v.reservedSpace);
|
||||
innerInfo.put("reservedSpaceForReplicas", v.reservedSpaceForReplicas);
|
||||
innerInfo.put("numBlocks", v.numBlocks);
|
||||
info.put(v.directory, innerInfo);
|
||||
}
|
||||
return info;
|
||||
@ -2728,8 +2736,8 @@ public void onCompleteLazyPersist(String bpId, long blockId,
|
||||
synchronized (FsDatasetImpl.this) {
|
||||
ramDiskReplicaTracker.recordEndLazyPersist(bpId, blockId, savedFiles);
|
||||
|
||||
targetVolume.incDfsUsed(bpId,
|
||||
savedFiles[0].length() + savedFiles[1].length());
|
||||
targetVolume.incDfsUsedAndNumBlocks(bpId, savedFiles[0].length()
|
||||
+ savedFiles[1].length());
|
||||
|
||||
// Update metrics (ignore the metadata file size)
|
||||
datanode.getMetrics().incrRamDiskBlocksLazyPersisted();
|
||||
|
@ -284,21 +284,35 @@ File getTmpDir(String bpid) throws IOException {
|
||||
}
|
||||
|
||||
void onBlockFileDeletion(String bpid, long value) {
|
||||
decDfsUsed(bpid, value);
|
||||
decDfsUsedAndNumBlocks(bpid, value, true);
|
||||
if (isTransientStorage()) {
|
||||
dataset.releaseLockedMemory(value, true);
|
||||
}
|
||||
}
|
||||
|
||||
void onMetaFileDeletion(String bpid, long value) {
|
||||
decDfsUsed(bpid, value);
|
||||
decDfsUsedAndNumBlocks(bpid, value, false);
|
||||
}
|
||||
|
||||
private void decDfsUsed(String bpid, long value) {
|
||||
private void decDfsUsedAndNumBlocks(String bpid, long value,
|
||||
boolean blockFileDeleted) {
|
||||
synchronized(dataset) {
|
||||
BlockPoolSlice bp = bpSlices.get(bpid);
|
||||
if (bp != null) {
|
||||
bp.decDfsUsed(value);
|
||||
if (blockFileDeleted) {
|
||||
bp.decrNumBlocks();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void incDfsUsedAndNumBlocks(String bpid, long value) {
|
||||
synchronized (dataset) {
|
||||
BlockPoolSlice bp = bpSlices.get(bpid);
|
||||
if (bp != null) {
|
||||
bp.incDfsUsed(value);
|
||||
bp.incrNumBlocks();
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -847,7 +861,15 @@ void getVolumeMap(String bpid, ReplicaMap volumeMap,
|
||||
throws IOException {
|
||||
getBlockPoolSlice(bpid).getVolumeMap(volumeMap, ramDiskReplicaMap);
|
||||
}
|
||||
|
||||
|
||||
long getNumBlocks() {
|
||||
long numBlocks = 0;
|
||||
for (BlockPoolSlice s : bpSlices.values()) {
|
||||
numBlocks += s.getNumOfBlocks();
|
||||
}
|
||||
return numBlocks;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return currentDir.getAbsolutePath();
|
||||
|
@ -18,15 +18,23 @@
|
||||
package org.apache.hadoop.hdfs.server.datanode;
|
||||
|
||||
import java.lang.management.ManagementFactory;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import javax.management.MBeanServer;
|
||||
import javax.management.ObjectName;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.mortbay.util.ajax.JSON;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
/**
|
||||
* Class for testing {@link DataNodeMXBean} implementation
|
||||
@ -84,4 +92,51 @@ public void testDataNodeMXBean() throws Exception {
|
||||
private static String replaceDigits(final String s) {
|
||||
return s.replaceAll("[0-9]+", "_DIGITS_");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDataNodeMXBeanBlockCount() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
|
||||
|
||||
try {
|
||||
List<DataNode> datanodes = cluster.getDataNodes();
|
||||
assertEquals(datanodes.size(), 1);
|
||||
|
||||
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
|
||||
ObjectName mxbeanName =
|
||||
new ObjectName("Hadoop:service=DataNode,name=DataNodeInfo");
|
||||
FileSystem fs = cluster.getFileSystem();
|
||||
for (int i = 0; i < 5; i++) {
|
||||
DFSTestUtil.createFile(fs, new Path("/tmp.txt" + i), 1024, (short) 1,
|
||||
1L);
|
||||
}
|
||||
assertEquals("Before restart DN", 5, getTotalNumBlocks(mbs, mxbeanName));
|
||||
cluster.restartDataNode(0);
|
||||
cluster.waitActive();
|
||||
assertEquals("After restart DN", 5, getTotalNumBlocks(mbs, mxbeanName));
|
||||
fs.delete(new Path("/tmp.txt1"), true);
|
||||
// Wait till replica gets deleted on disk.
|
||||
Thread.sleep(5000);
|
||||
assertEquals("After delete one file", 4,
|
||||
getTotalNumBlocks(mbs, mxbeanName));
|
||||
} finally {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
int getTotalNumBlocks(MBeanServer mbs, ObjectName mxbeanName)
|
||||
throws Exception {
|
||||
int totalBlocks = 0;
|
||||
String volumeInfo = (String) mbs.getAttribute(mxbeanName, "VolumeInfo");
|
||||
Map<?, ?> m = (Map<?, ?>) JSON.parse(volumeInfo);
|
||||
Collection<Map<String, Long>> values =
|
||||
(Collection<Map<String, Long>>) m.values();
|
||||
for (Map<String, Long> volumeInfoMap : values) {
|
||||
totalBlocks += volumeInfoMap.get("numBlocks");
|
||||
}
|
||||
return totalBlocks;
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user