HDFS-10645. Make block report size as a metric and add this metric to datanode web ui. Contributed by Yuanbo Liu.
This commit is contained in:
parent
dbcaf999d9
commit
8179f9a493
@ -315,6 +315,7 @@ Each metrics record contains tags such as SessionId and Hostname as additional i
|
|||||||
| `TotalReadTime` | Total number of milliseconds spent on read operation |
|
| `TotalReadTime` | Total number of milliseconds spent on read operation |
|
||||||
| `RemoteBytesRead` | Number of bytes read by remote clients |
|
| `RemoteBytesRead` | Number of bytes read by remote clients |
|
||||||
| `RemoteBytesWritten` | Number of bytes written by remote clients |
|
| `RemoteBytesWritten` | Number of bytes written by remote clients |
|
||||||
|
| `BPServiceActorInfo` | The information about a block pool service actor |
|
||||||
|
|
||||||
yarn context
|
yarn context
|
||||||
============
|
============
|
||||||
|
@ -26,10 +26,13 @@ import java.net.InetSocketAddress;
|
|||||||
import java.net.SocketTimeoutException;
|
import java.net.SocketTimeoutException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.SortedSet;
|
||||||
|
import java.util.TreeSet;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.ThreadLocalRandom;
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
@ -101,6 +104,9 @@ class BPServiceActor implements Runnable {
|
|||||||
private final DataNode dn;
|
private final DataNode dn;
|
||||||
private final DNConf dnConf;
|
private final DNConf dnConf;
|
||||||
private long prevBlockReportId;
|
private long prevBlockReportId;
|
||||||
|
private final SortedSet<Integer> blockReportSizes =
|
||||||
|
Collections.synchronizedSortedSet(new TreeSet<>());
|
||||||
|
private final int maxDataLength;
|
||||||
|
|
||||||
private final IncrementalBlockReportManager ibrManager;
|
private final IncrementalBlockReportManager ibrManager;
|
||||||
|
|
||||||
@ -122,6 +128,8 @@ class BPServiceActor implements Runnable {
|
|||||||
prevBlockReportId = ThreadLocalRandom.current().nextLong();
|
prevBlockReportId = ThreadLocalRandom.current().nextLong();
|
||||||
scheduler = new Scheduler(dnConf.heartBeatInterval,
|
scheduler = new Scheduler(dnConf.heartBeatInterval,
|
||||||
dnConf.getLifelineIntervalMs(), dnConf.blockReportInterval);
|
dnConf.getLifelineIntervalMs(), dnConf.blockReportInterval);
|
||||||
|
// get the value of maxDataLength.
|
||||||
|
this.maxDataLength = dnConf.getMaxDataLength();
|
||||||
}
|
}
|
||||||
|
|
||||||
public DatanodeRegistration getBpRegistration() {
|
public DatanodeRegistration getBpRegistration() {
|
||||||
@ -166,6 +174,8 @@ class BPServiceActor implements Runnable {
|
|||||||
String.valueOf(getScheduler().getLastHearbeatTime()));
|
String.valueOf(getScheduler().getLastHearbeatTime()));
|
||||||
info.put("LastBlockReport",
|
info.put("LastBlockReport",
|
||||||
String.valueOf(getScheduler().getLastBlockReportTime()));
|
String.valueOf(getScheduler().getLastBlockReportTime()));
|
||||||
|
info.put("maxBlockReportSize", String.valueOf(getMaxBlockReportSize()));
|
||||||
|
info.put("maxDataLength", String.valueOf(maxDataLength));
|
||||||
return info;
|
return info;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -305,6 +315,14 @@ class BPServiceActor implements Runnable {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private int getMaxBlockReportSize() {
|
||||||
|
int maxBlockReportSize = 0;
|
||||||
|
if (!blockReportSizes.isEmpty()) {
|
||||||
|
maxBlockReportSize = blockReportSizes.last();
|
||||||
|
}
|
||||||
|
return maxBlockReportSize;
|
||||||
|
}
|
||||||
|
|
||||||
private long generateUniqueBlockReportId() {
|
private long generateUniqueBlockReportId() {
|
||||||
// Initialize the block report ID the first time through.
|
// Initialize the block report ID the first time through.
|
||||||
// Note that 0 is used on the NN to indicate "uninitialized", so we should
|
// Note that 0 is used on the NN to indicate "uninitialized", so we should
|
||||||
@ -353,12 +371,18 @@ class BPServiceActor implements Runnable {
|
|||||||
boolean success = false;
|
boolean success = false;
|
||||||
long brSendStartTime = monotonicNow();
|
long brSendStartTime = monotonicNow();
|
||||||
long reportId = generateUniqueBlockReportId();
|
long reportId = generateUniqueBlockReportId();
|
||||||
|
boolean useBlocksBuffer =
|
||||||
|
bpRegistration.getNamespaceInfo().isCapabilitySupported(
|
||||||
|
NamespaceInfo.Capability.STORAGE_BLOCK_REPORT_BUFFERS);
|
||||||
|
blockReportSizes.clear();
|
||||||
try {
|
try {
|
||||||
if (totalBlockCount < dnConf.blockReportSplitThreshold) {
|
if (totalBlockCount < dnConf.blockReportSplitThreshold) {
|
||||||
// Below split threshold, send all reports in a single message.
|
// Below split threshold, send all reports in a single message.
|
||||||
DatanodeCommand cmd = bpNamenode.blockReport(
|
DatanodeCommand cmd = bpNamenode.blockReport(
|
||||||
bpRegistration, bpos.getBlockPoolId(), reports,
|
bpRegistration, bpos.getBlockPoolId(), reports,
|
||||||
new BlockReportContext(1, 0, reportId, fullBrLeaseId, true));
|
new BlockReportContext(1, 0, reportId, fullBrLeaseId, true));
|
||||||
|
blockReportSizes.add(
|
||||||
|
calculateBlockReportPBSize(useBlocksBuffer, reports));
|
||||||
numRPCs = 1;
|
numRPCs = 1;
|
||||||
numReportsSent = reports.length;
|
numReportsSent = reports.length;
|
||||||
if (cmd != null) {
|
if (cmd != null) {
|
||||||
@ -372,6 +396,8 @@ class BPServiceActor implements Runnable {
|
|||||||
bpRegistration, bpos.getBlockPoolId(), singleReport,
|
bpRegistration, bpos.getBlockPoolId(), singleReport,
|
||||||
new BlockReportContext(reports.length, r, reportId,
|
new BlockReportContext(reports.length, r, reportId,
|
||||||
fullBrLeaseId, true));
|
fullBrLeaseId, true));
|
||||||
|
blockReportSizes.add(
|
||||||
|
calculateBlockReportPBSize(useBlocksBuffer, singleReport));
|
||||||
numReportsSent++;
|
numReportsSent++;
|
||||||
numRPCs++;
|
numRPCs++;
|
||||||
if (cmd != null) {
|
if (cmd != null) {
|
||||||
@ -437,7 +463,22 @@ class BPServiceActor implements Runnable {
|
|||||||
}
|
}
|
||||||
return cmd;
|
return cmd;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private int calculateBlockReportPBSize(
|
||||||
|
boolean useBlocksBuffer, StorageBlockReport[] reports) {
|
||||||
|
int reportSize = 0;
|
||||||
|
|
||||||
|
for (StorageBlockReport r : reports) {
|
||||||
|
if (useBlocksBuffer) {
|
||||||
|
reportSize += r.getBlocks().getBlocksBuffer().size();
|
||||||
|
} else {
|
||||||
|
// each block costs 10 bytes in PB because of uint64
|
||||||
|
reportSize += 10 * r.getBlocks().getBlockListAsLongs().length;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return reportSize;
|
||||||
|
}
|
||||||
|
|
||||||
HeartbeatResponse sendHeartBeat(boolean requestBlockReportLease)
|
HeartbeatResponse sendHeartBeat(boolean requestBlockReportLease)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
scheduler.scheduleNextHeartbeat();
|
scheduler.scheduleNextHeartbeat();
|
||||||
|
@ -117,6 +117,7 @@ public class DNConf {
|
|||||||
|
|
||||||
private final int volFailuresTolerated;
|
private final int volFailuresTolerated;
|
||||||
private final int volsConfigured;
|
private final int volsConfigured;
|
||||||
|
private final int maxDataLength;
|
||||||
|
|
||||||
public DNConf(Configuration conf) {
|
public DNConf(Configuration conf) {
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
@ -149,6 +150,8 @@ public class DNConf {
|
|||||||
readaheadLength = conf.getLong(
|
readaheadLength = conf.getLong(
|
||||||
HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_KEY,
|
HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_KEY,
|
||||||
HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
|
HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
|
||||||
|
maxDataLength = conf.getInt(DFSConfigKeys.IPC_MAXIMUM_DATA_LENGTH,
|
||||||
|
DFSConfigKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT);
|
||||||
dropCacheBehindWrites = conf.getBoolean(
|
dropCacheBehindWrites = conf.getBoolean(
|
||||||
DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_KEY,
|
DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_KEY,
|
||||||
DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_DEFAULT);
|
DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_DEFAULT);
|
||||||
@ -389,4 +392,8 @@ public class DNConf {
|
|||||||
public int getVolsConfigured() {
|
public int getVolsConfigured() {
|
||||||
return volsConfigured;
|
return volsConfigured;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int getMaxDataLength() {
|
||||||
|
return maxDataLength;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -80,6 +80,7 @@
|
|||||||
<th>Actor State</th>
|
<th>Actor State</th>
|
||||||
<th>Last Heartbeat</th>
|
<th>Last Heartbeat</th>
|
||||||
<th>Last Block Report</th>
|
<th>Last Block Report</th>
|
||||||
|
<th>Last Block Report Size (Max Size)</th>
|
||||||
</tr>
|
</tr>
|
||||||
</thead>
|
</thead>
|
||||||
{#dn.BPServiceActorInfo}
|
{#dn.BPServiceActorInfo}
|
||||||
@ -89,6 +90,7 @@
|
|||||||
<td>{ActorState}</td>
|
<td>{ActorState}</td>
|
||||||
<td>{LastHeartbeat}s</td>
|
<td>{LastHeartbeat}s</td>
|
||||||
<td>{#helper_relative_time value="{LastBlockReport}"/}</td>
|
<td>{#helper_relative_time value="{LastBlockReport}"/}</td>
|
||||||
|
<td>{maxBlockReportSize|fmt_bytes} ({maxDataLength|fmt_bytes})</td>
|
||||||
</tr>
|
</tr>
|
||||||
{/dn.BPServiceActorInfo}
|
{/dn.BPServiceActorInfo}
|
||||||
</table>
|
</table>
|
||||||
|
@ -18,6 +18,7 @@
|
|||||||
package org.apache.hadoop.hdfs.server.datanode;
|
package org.apache.hadoop.hdfs.server.datanode;
|
||||||
|
|
||||||
import java.lang.management.ManagementFactory;
|
import java.lang.management.ManagementFactory;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
@ -26,22 +27,31 @@ import javax.management.MBeanServer;
|
|||||||
import javax.management.ObjectName;
|
import javax.management.ObjectName;
|
||||||
|
|
||||||
import com.google.common.base.Supplier;
|
import com.google.common.base.Supplier;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
|
import org.codehaus.jackson.map.ObjectMapper;
|
||||||
|
import org.codehaus.jackson.type.TypeReference;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.mortbay.util.ajax.JSON;
|
import org.mortbay.util.ajax.JSON;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Class for testing {@link DataNodeMXBean} implementation
|
* Class for testing {@link DataNodeMXBean} implementation
|
||||||
*/
|
*/
|
||||||
public class TestDataNodeMXBean {
|
public class TestDataNodeMXBean {
|
||||||
|
|
||||||
|
public static final Log LOG = LogFactory.getLog(TestDataNodeMXBean.class);
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testDataNodeMXBean() throws Exception {
|
public void testDataNodeMXBean() throws Exception {
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
@ -99,6 +109,48 @@ public class TestDataNodeMXBean {
|
|||||||
return s.replaceAll("[0-9]+", "_DIGITS_");
|
return s.replaceAll("[0-9]+", "_DIGITS_");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDataNodeMXBeanBlockSize() throws Exception {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
|
||||||
|
try(MiniDFSCluster cluster =
|
||||||
|
new MiniDFSCluster.Builder(conf).build()) {
|
||||||
|
DataNode dn = cluster.getDataNodes().get(0);
|
||||||
|
for (int i = 0; i < 100; i++) {
|
||||||
|
DFSTestUtil.writeFile(
|
||||||
|
cluster.getFileSystem(),
|
||||||
|
new Path("/foo" + String.valueOf(i) + ".txt"), "test content");
|
||||||
|
}
|
||||||
|
DataNodeTestUtils.triggerBlockReport(dn);
|
||||||
|
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
|
||||||
|
ObjectName mxbeanName = new ObjectName(
|
||||||
|
"Hadoop:service=DataNode,name=DataNodeInfo");
|
||||||
|
String bpActorInfo = (String)mbs.getAttribute(mxbeanName,
|
||||||
|
"BPServiceActorInfo");
|
||||||
|
Assert.assertEquals(dn.getBPServiceActorInfo(), bpActorInfo);
|
||||||
|
LOG.info("bpActorInfo is " + bpActorInfo);
|
||||||
|
TypeReference<ArrayList<Map<String, String>>> typeRef
|
||||||
|
= new TypeReference<ArrayList<Map<String, String>>>() {};
|
||||||
|
ArrayList<Map<String, String>> bpActorInfoList =
|
||||||
|
new ObjectMapper().readValue(bpActorInfo, typeRef);
|
||||||
|
int maxDataLength =
|
||||||
|
Integer.valueOf(bpActorInfoList.get(0).get("maxDataLength"));
|
||||||
|
int confMaxDataLength = dn.getConf().getInt(
|
||||||
|
CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH,
|
||||||
|
CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT);
|
||||||
|
int maxBlockReportSize =
|
||||||
|
Integer.valueOf(bpActorInfoList.get(0).get("maxBlockReportSize"));
|
||||||
|
LOG.info("maxDataLength is " + maxDataLength);
|
||||||
|
LOG.info("maxBlockReportSize is " + maxBlockReportSize);
|
||||||
|
assertTrue("maxBlockReportSize should be greater than zero",
|
||||||
|
maxBlockReportSize > 0);
|
||||||
|
assertEquals("maxDataLength should be exactly "
|
||||||
|
+ "the same value of ipc.maximum.data.length",
|
||||||
|
confMaxDataLength,
|
||||||
|
maxDataLength);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testDataNodeMXBeanBlockCount() throws Exception {
|
public void testDataNodeMXBeanBlockCount() throws Exception {
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
|
Loading…
x
Reference in New Issue
Block a user