HDFS-5693. Few NN metrics data points were collected via JMX when NN is under heavy load. Contributed by Ming Ma.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1589620 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
a5bec2b871
commit
24d1cf9ac6
@ -311,6 +311,9 @@ Release 2.5.0 - UNRELEASED
|
||||
|
||||
HDFS-6279. Create new index page for JN / DN. (wheat9)
|
||||
|
||||
HDFS-5693. Few NN metrics data points were collected via JMX when NN
|
||||
is under heavy load. (Ming Ma via jing9)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HDFS-6214. Webhdfs has poor throughput for files >2GB (daryn)
|
||||
|
@ -3289,12 +3289,7 @@ public void removeBlockFromMap(Block block) {
|
||||
}
|
||||
|
||||
public int getCapacity() {
|
||||
namesystem.readLock();
|
||||
try {
|
||||
return blocksMap.getCapacity();
|
||||
} finally {
|
||||
namesystem.readUnlock();
|
||||
}
|
||||
return blocksMap.getCapacity();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1014,21 +1014,18 @@ public int getNumDeadDataNodes() {
|
||||
|
||||
/** @return list of datanodes where decommissioning is in progress. */
|
||||
public List<DatanodeDescriptor> getDecommissioningNodes() {
|
||||
namesystem.readLock();
|
||||
try {
|
||||
final List<DatanodeDescriptor> decommissioningNodes
|
||||
= new ArrayList<DatanodeDescriptor>();
|
||||
final List<DatanodeDescriptor> results = getDatanodeListForReport(
|
||||
DatanodeReportType.LIVE);
|
||||
for(DatanodeDescriptor node : results) {
|
||||
if (node.isDecommissionInProgress()) {
|
||||
decommissioningNodes.add(node);
|
||||
}
|
||||
// There is no need to take namesystem reader lock as
|
||||
// getDatanodeListForReport will synchronize on datanodeMap
|
||||
final List<DatanodeDescriptor> decommissioningNodes
|
||||
= new ArrayList<DatanodeDescriptor>();
|
||||
final List<DatanodeDescriptor> results = getDatanodeListForReport(
|
||||
DatanodeReportType.LIVE);
|
||||
for(DatanodeDescriptor node : results) {
|
||||
if (node.isDecommissionInProgress()) {
|
||||
decommissioningNodes.add(node);
|
||||
}
|
||||
return decommissioningNodes;
|
||||
} finally {
|
||||
namesystem.readUnlock();
|
||||
}
|
||||
return decommissioningNodes;
|
||||
}
|
||||
|
||||
/* Getter and Setter for stale DataNodes related attributes */
|
||||
@ -1081,23 +1078,20 @@ public void fetchDatanodes(final List<DatanodeDescriptor> live,
|
||||
throw new HadoopIllegalArgumentException("Both live and dead lists are null");
|
||||
}
|
||||
|
||||
namesystem.readLock();
|
||||
try {
|
||||
final List<DatanodeDescriptor> results =
|
||||
getDatanodeListForReport(DatanodeReportType.ALL);
|
||||
for(DatanodeDescriptor node : results) {
|
||||
if (isDatanodeDead(node)) {
|
||||
if (dead != null) {
|
||||
dead.add(node);
|
||||
}
|
||||
} else {
|
||||
if (live != null) {
|
||||
live.add(node);
|
||||
}
|
||||
// There is no need to take namesystem reader lock as
|
||||
// getDatanodeListForReport will synchronize on datanodeMap
|
||||
final List<DatanodeDescriptor> results =
|
||||
getDatanodeListForReport(DatanodeReportType.ALL);
|
||||
for(DatanodeDescriptor node : results) {
|
||||
if (isDatanodeDead(node)) {
|
||||
if (dead != null) {
|
||||
dead.add(node);
|
||||
}
|
||||
} else {
|
||||
if (live != null) {
|
||||
live.add(node);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
namesystem.readUnlock();
|
||||
}
|
||||
|
||||
if (removeDecommissionNode) {
|
||||
|
@ -4652,8 +4652,12 @@ public class SafeModeInfo {
|
||||
private final double threshold;
|
||||
/** Safe mode minimum number of datanodes alive */
|
||||
private final int datanodeThreshold;
|
||||
/** Safe mode extension after the threshold. */
|
||||
private int extension;
|
||||
/**
|
||||
* Safe mode extension after the threshold.
|
||||
* Make it volatile so that getSafeModeTip can read the latest value
|
||||
* without taking a lock.
|
||||
*/
|
||||
private volatile int extension;
|
||||
/** Min replication required by safe mode. */
|
||||
private final int safeReplication;
|
||||
/** threshold for populating needed replication queues */
|
||||
@ -4675,8 +4679,12 @@ public class SafeModeInfo {
|
||||
private int blockReplQueueThreshold;
|
||||
/** time of the last status printout */
|
||||
private long lastStatusReport = 0;
|
||||
/** Was safemode entered automatically because available resources were low. */
|
||||
private boolean resourcesLow = false;
|
||||
/**
|
||||
* Was safemode entered automatically because available resources were low.
|
||||
* Make it volatile so that getSafeModeTip can read the latest value
|
||||
* without taking a lock.
|
||||
*/
|
||||
private volatile boolean resourcesLow = false;
|
||||
/** Should safemode adjust its block totals as blocks come in */
|
||||
private boolean shouldIncrementallyTrackBlocks = false;
|
||||
/** counter for tracking startup progress of reported blocks */
|
||||
@ -5351,14 +5359,21 @@ void leaveSafeMode() {
|
||||
}
|
||||
|
||||
String getSafeModeTip() {
|
||||
readLock();
|
||||
try {
|
||||
if (!isInSafeMode()) {
|
||||
return "";
|
||||
}
|
||||
// There is no need to take readLock.
|
||||
// Don't use isInSafeMode as this.safeMode might be set to null.
|
||||
// after isInSafeMode returns.
|
||||
boolean inSafeMode;
|
||||
SafeModeInfo safeMode = this.safeMode;
|
||||
if (safeMode == null) {
|
||||
inSafeMode = false;
|
||||
} else {
|
||||
inSafeMode = safeMode.isOn();
|
||||
}
|
||||
|
||||
if (!inSafeMode) {
|
||||
return "";
|
||||
} else {
|
||||
return safeMode.getTurnOffTip();
|
||||
} finally {
|
||||
readUnlock();
|
||||
}
|
||||
}
|
||||
|
||||
@ -5534,12 +5549,9 @@ public long getMaxObjects() {
|
||||
@Override // FSNamesystemMBean
|
||||
@Metric
|
||||
public long getFilesTotal() {
|
||||
readLock();
|
||||
try {
|
||||
return this.dir.totalInodes();
|
||||
} finally {
|
||||
readUnlock();
|
||||
}
|
||||
// There is no need to take fSNamesystem's lock as
|
||||
// FSDirectory has its own lock.
|
||||
return this.dir.totalInodes();
|
||||
}
|
||||
|
||||
@Override // FSNamesystemMBean
|
||||
@ -6119,6 +6131,23 @@ Collection<CorruptFileBlockInfo> listCorruptFileBlocks(String path,
|
||||
String[] cookieTab) throws IOException {
|
||||
checkSuperuserPrivilege();
|
||||
checkOperation(OperationCategory.READ);
|
||||
|
||||
int count = 0;
|
||||
ArrayList<CorruptFileBlockInfo> corruptFiles =
|
||||
new ArrayList<CorruptFileBlockInfo>();
|
||||
if (cookieTab == null) {
|
||||
cookieTab = new String[] { null };
|
||||
}
|
||||
|
||||
// Do a quick check if there are any corrupt files without taking the lock
|
||||
if (blockManager.getMissingBlocksCount() == 0) {
|
||||
if (cookieTab[0] == null) {
|
||||
cookieTab[0] = String.valueOf(getIntCookie(cookieTab[0]));
|
||||
}
|
||||
LOG.info("there are no corrupt file blocks.");
|
||||
return corruptFiles;
|
||||
}
|
||||
|
||||
readLock();
|
||||
try {
|
||||
checkOperation(OperationCategory.READ);
|
||||
@ -6127,14 +6156,9 @@ Collection<CorruptFileBlockInfo> listCorruptFileBlocks(String path,
|
||||
"replication queues have not been initialized.");
|
||||
}
|
||||
// print a limited # of corrupt files per call
|
||||
int count = 0;
|
||||
ArrayList<CorruptFileBlockInfo> corruptFiles = new ArrayList<CorruptFileBlockInfo>();
|
||||
|
||||
final Iterator<Block> blkIterator = blockManager.getCorruptReplicaBlockIterator();
|
||||
|
||||
if (cookieTab == null) {
|
||||
cookieTab = new String[] { null };
|
||||
}
|
||||
int skip = getIntCookie(cookieTab[0]);
|
||||
for (int i = 0; i < skip && blkIterator.hasNext(); i++) {
|
||||
blkIterator.next();
|
||||
|
@ -35,6 +35,87 @@
|
||||
*/
|
||||
public class TestFSNamesystemMBean {
|
||||
|
||||
/**
|
||||
* MBeanClient tries to access FSNamesystem/FSNamesystemState/NameNodeInfo
|
||||
* JMX properties. If it can access all the properties, the test is
|
||||
* considered successful.
|
||||
*/
|
||||
private static class MBeanClient extends Thread {
|
||||
private boolean succeeded = false;
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
|
||||
|
||||
// Metrics that belong to "FSNamesystem", these are metrics that
|
||||
// come from hadoop metrics framework for the class FSNamesystem.
|
||||
ObjectName mxbeanNamefsn = new ObjectName(
|
||||
"Hadoop:service=NameNode,name=FSNamesystem");
|
||||
Integer blockCapacity = (Integer) (mbs.getAttribute(mxbeanNamefsn,
|
||||
"BlockCapacity"));
|
||||
|
||||
// Metrics that belong to "FSNamesystemState".
|
||||
// These are metrics that FSNamesystem registers directly with MBeanServer.
|
||||
ObjectName mxbeanNameFsns = new ObjectName(
|
||||
"Hadoop:service=NameNode,name=FSNamesystemState");
|
||||
String FSState = (String) (mbs.getAttribute(mxbeanNameFsns,
|
||||
"FSState"));
|
||||
Long blocksTotal = (Long) (mbs.getAttribute(mxbeanNameFsns,
|
||||
"BlocksTotal"));
|
||||
Long capacityTotal = (Long) (mbs.getAttribute(mxbeanNameFsns,
|
||||
"CapacityTotal"));
|
||||
Long capacityRemaining = (Long) (mbs.getAttribute(mxbeanNameFsns,
|
||||
"CapacityRemaining"));
|
||||
Long capacityUsed = (Long) (mbs.getAttribute(mxbeanNameFsns,
|
||||
"CapacityUsed"));
|
||||
Long filesTotal = (Long) (mbs.getAttribute(mxbeanNameFsns,
|
||||
"FilesTotal"));
|
||||
Long pendingReplicationBlocks = (Long) (mbs.getAttribute(mxbeanNameFsns,
|
||||
"PendingReplicationBlocks"));
|
||||
Long underReplicatedBlocks = (Long) (mbs.getAttribute(mxbeanNameFsns,
|
||||
"UnderReplicatedBlocks"));
|
||||
Long scheduledReplicationBlocks = (Long) (mbs.getAttribute(mxbeanNameFsns,
|
||||
"ScheduledReplicationBlocks"));
|
||||
Integer totalLoad = (Integer) (mbs.getAttribute(mxbeanNameFsns,
|
||||
"TotalLoad"));
|
||||
Integer numLiveDataNodes = (Integer) (mbs.getAttribute(mxbeanNameFsns,
|
||||
"NumLiveDataNodes"));
|
||||
Integer numDeadDataNodes = (Integer) (mbs.getAttribute(mxbeanNameFsns,
|
||||
"NumDeadDataNodes"));
|
||||
Integer numStaleDataNodes = (Integer) (mbs.getAttribute(mxbeanNameFsns,
|
||||
"NumStaleDataNodes"));
|
||||
Integer numDecomLiveDataNodes = (Integer) (mbs.getAttribute(mxbeanNameFsns,
|
||||
"NumDecomLiveDataNodes"));
|
||||
Integer numDecomDeadDataNodes = (Integer) (mbs.getAttribute(mxbeanNameFsns,
|
||||
"NumDecomDeadDataNodes"));
|
||||
Integer numDecommissioningDataNodes = (Integer) (mbs.getAttribute(mxbeanNameFsns,
|
||||
"NumDecommissioningDataNodes"));
|
||||
String snapshotStats = (String) (mbs.getAttribute(mxbeanNameFsns,
|
||||
"SnapshotStats"));
|
||||
Long MaxObjects = (Long) (mbs.getAttribute(mxbeanNameFsns,
|
||||
"MaxObjects"));
|
||||
|
||||
// Metrics that belong to "NameNodeInfo".
|
||||
// These are metrics that FSNamesystem registers directly with MBeanServer.
|
||||
ObjectName mxbeanNameNni = new ObjectName(
|
||||
"Hadoop:service=NameNode,name=NameNodeInfo");
|
||||
String safemode = (String) (mbs.getAttribute(mxbeanNameNni,
|
||||
"Safemode"));
|
||||
String liveNodes = (String) (mbs.getAttribute(mxbeanNameNni,
|
||||
"LiveNodes"));
|
||||
String deadNodes = (String) (mbs.getAttribute(mxbeanNameNni,
|
||||
"DeadNodes"));
|
||||
String decomNodes = (String) (mbs.getAttribute(mxbeanNameNni,
|
||||
"DecomNodes"));
|
||||
String corruptFiles = (String) (mbs.getAttribute(mxbeanNameNni,
|
||||
"CorruptFiles"));
|
||||
|
||||
succeeded = true;
|
||||
} catch (Exception e) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
@ -73,4 +154,35 @@ public void test() throws Exception {
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// The test makes sure JMX request can be processed even if namesystem's
|
||||
// writeLock is owned by another thread.
|
||||
@Test
|
||||
public void testWithFSNamesystemWriteLock() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
MiniDFSCluster cluster = null;
|
||||
FSNamesystem fsn = null;
|
||||
|
||||
try {
|
||||
cluster = new MiniDFSCluster.Builder(conf).build();
|
||||
cluster.waitActive();
|
||||
|
||||
fsn = cluster.getNameNode().namesystem;
|
||||
fsn.writeLock();
|
||||
|
||||
MBeanClient client = new MBeanClient();
|
||||
client.start();
|
||||
client.join(20000);
|
||||
assertTrue("JMX calls are blocked when FSNamesystem's writerlock" +
|
||||
"is owned by another thread", client.succeeded);
|
||||
client.interrupt();
|
||||
} finally {
|
||||
if (fsn != null && fsn.hasWriteLock()) {
|
||||
fsn.writeUnlock();
|
||||
}
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user