HDFS-7390. Provide JMX metrics per storage type. (Benoy Antony)

This commit is contained in:
Benoy Antony 2015-06-29 11:00:22 -07:00
parent fde20ffcef
commit d3fed8e653
6 changed files with 412 additions and 2 deletions

View File

@ -39,6 +39,8 @@
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
import javax.management.ObjectName;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
@ -85,6 +87,7 @@
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Daemon;
@ -94,6 +97,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -101,7 +105,7 @@
* Keeps information related to the blocks stored in the Hadoop cluster.
*/
@InterfaceAudience.Private
public class BlockManager {
public class BlockManager implements BlockStatsMXBean {
public static final Logger LOG = LoggerFactory.getLogger(BlockManager.class);
public static final Logger blockLog = NameNode.blockStateChangeLog;
@ -129,6 +133,7 @@ public class BlockManager {
private final AtomicLong postponedMisreplicatedBlocksCount = new AtomicLong(0L);
private final long startupDelayBlockDeletionInMs;
private final BlockReportLeaseManager blockReportLeaseManager;
private ObjectName mxBeanName;
/** Used by metrics */
public long getPendingReplicationBlocksCount() {
@ -468,6 +473,7 @@ public void activate(Configuration conf) {
pendingReplications.start();
datanodeManager.activate(conf);
this.replicationThread.start();
mxBeanName = MBeans.register("NameNode", "BlockStats", this);
}
public void close() {
@ -3944,6 +3950,8 @@ enum MisReplicationResult {
public void shutdown() {
stopReplicationInitializer();
blocksMap.close();
MBeans.unregister(mxBeanName);
mxBeanName = null;
}
public void clear() {
@ -3954,4 +3962,9 @@ public void clear() {
public BlockReportLeaseManager getBlockReportLeaseManager() {
return blockReportLeaseManager;
}
@Override // BlockStatsMXBean
public Map<StorageType, StorageTypeStats> getStorageTypeStats() {
return datanodeManager.getDatanodeStatistics().getStorageTypeStats();
}
}

View File

@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import java.util.Map;
import org.apache.hadoop.fs.StorageType;
/**
* This is an interface used to retrieve statistic information related to
* block management.
*/
public interface BlockStatsMXBean {
/**
* The statistics of storage types.
*
* @return get storage statistics per storage type
*/
Map<StorageType, StorageTypeStats> getStorageTypeStats();
}

View File

@ -17,6 +17,9 @@
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import java.util.Map;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
/** Datanode statistics */
@ -71,4 +74,7 @@ public interface DatanodeStatistics {
/** @return the expired heartbeats */
public int getExpiredHeartbeats();
/** @return Storage Tier statistics*/
Map<StorageType, StorageTypeStats> getStorageTypeStats();
}

View File

@ -18,9 +18,15 @@
package org.apache.hadoop.hdfs.server.blockmanagement;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
@ -189,6 +195,11 @@ public synchronized int getExpiredHeartbeats() {
return stats.expiredHeartbeats;
}
@Override
public Map<StorageType, StorageTypeStats> getStorageTypeStats() {
return stats.statsMap.get();
}
synchronized void register(final DatanodeDescriptor d) {
if (!d.isAlive) {
addDatanode(d);
@ -393,6 +404,9 @@ public void run() {
* For decommissioning/decommissioned nodes, only used capacity is counted.
*/
private static class Stats {
private final StorageTypeStatsMap statsMap = new StorageTypeStatsMap();
private long capacityTotal = 0L;
private long capacityUsed = 0L;
private long capacityRemaining = 0L;
@ -420,6 +434,14 @@ private void add(final DatanodeDescriptor node) {
}
cacheCapacity += node.getCacheCapacity();
cacheUsed += node.getCacheUsed();
Set<StorageType> storageTypes = new HashSet<>();
for (DatanodeStorageInfo storageInfo : node.getStorageInfos()) {
statsMap.addStorage(storageInfo, node);
storageTypes.add(storageInfo.getStorageType());
}
for (StorageType storageType : storageTypes) {
statsMap.addNode(storageType, node);
}
}
private void subtract(final DatanodeDescriptor node) {
@ -436,6 +458,14 @@ private void subtract(final DatanodeDescriptor node) {
}
cacheCapacity -= node.getCacheCapacity();
cacheUsed -= node.getCacheUsed();
Set<StorageType> storageTypes = new HashSet<>();
for (DatanodeStorageInfo storageInfo : node.getStorageInfos()) {
statsMap.subtractStorage(storageInfo, node);
storageTypes.add(storageInfo.getStorageType());
}
for (StorageType storageType : storageTypes) {
statsMap.subtractNode(storageType, node);
}
}
/** Increment expired heartbeat counter. */
@ -443,5 +473,69 @@ private void incrExpiredHeartbeats() {
expiredHeartbeats++;
}
}
}
/** StorageType specific statistics.
* For decommissioning/decommissioned nodes, only used capacity is counted.
*/
static final class StorageTypeStatsMap {
private Map<StorageType, StorageTypeStats> storageTypeStatsMap =
new IdentityHashMap<>();
private StorageTypeStatsMap() {}
private StorageTypeStatsMap(StorageTypeStatsMap other) {
storageTypeStatsMap =
new IdentityHashMap<>(other.storageTypeStatsMap);
for (Map.Entry<StorageType, StorageTypeStats> entry :
storageTypeStatsMap.entrySet()) {
entry.setValue(new StorageTypeStats(entry.getValue()));
}
}
private Map<StorageType, StorageTypeStats> get() {
return Collections.unmodifiableMap(storageTypeStatsMap);
}
private void addNode(StorageType storageType,
final DatanodeDescriptor node) {
StorageTypeStats storageTypeStats =
storageTypeStatsMap.get(storageType);
if (storageTypeStats == null) {
storageTypeStats = new StorageTypeStats();
storageTypeStatsMap.put(storageType, storageTypeStats);
}
storageTypeStats.addNode(node);
}
private void addStorage(final DatanodeStorageInfo info,
final DatanodeDescriptor node) {
StorageTypeStats storageTypeStats =
storageTypeStatsMap.get(info.getStorageType());
if (storageTypeStats == null) {
storageTypeStats = new StorageTypeStats();
storageTypeStatsMap.put(info.getStorageType(), storageTypeStats);
}
storageTypeStats.addStorage(info, node);
}
private void subtractStorage(final DatanodeStorageInfo info,
final DatanodeDescriptor node) {
StorageTypeStats storageTypeStats =
storageTypeStatsMap.get(info.getStorageType());
if (storageTypeStats != null) {
storageTypeStats.subtractStorage(info, node);
}
}
private void subtractNode(StorageType storageType,
final DatanodeDescriptor node) {
StorageTypeStats storageTypeStats =
storageTypeStatsMap.get(storageType);
if (storageTypeStats != null) {
storageTypeStats.subtractNode(node);
}
}
}
}

View File

@ -0,0 +1,115 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import java.beans.ConstructorProperties;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* Statistics per StorageType.
*
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class StorageTypeStats {
private long capacityTotal = 0L;
private long capacityUsed = 0L;
private long capacityRemaining = 0L;
private long blockPoolUsed = 0L;
private int nodesInService = 0;
@ConstructorProperties({"capacityTotal",
"capacityUsed", "capacityRemaining", "blockPoolUsed", "nodesInService"})
public StorageTypeStats(long capacityTotal, long capacityUsed,
long capacityRemaining, long blockPoolUsed, int nodesInService) {
this.capacityTotal = capacityTotal;
this.capacityUsed = capacityUsed;
this.capacityRemaining = capacityRemaining;
this.blockPoolUsed = blockPoolUsed;
this.nodesInService = nodesInService;
}
public long getCapacityTotal() {
return capacityTotal;
}
public long getCapacityUsed() {
return capacityUsed;
}
public long getCapacityRemaining() {
return capacityRemaining;
}
public long getBlockPoolUsed() {
return blockPoolUsed;
}
public int getNodesInService() {
return nodesInService;
}
StorageTypeStats() {}
StorageTypeStats(StorageTypeStats other) {
capacityTotal = other.capacityTotal;
capacityUsed = other.capacityUsed;
capacityRemaining = other.capacityRemaining;
blockPoolUsed = other.blockPoolUsed;
nodesInService = other.nodesInService;
}
void addStorage(final DatanodeStorageInfo info,
final DatanodeDescriptor node) {
capacityUsed += info.getDfsUsed();
blockPoolUsed += info.getBlockPoolUsed();
if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
capacityTotal += info.getCapacity();
capacityRemaining += info.getRemaining();
} else {
capacityTotal += info.getDfsUsed();
}
}
void addNode(final DatanodeDescriptor node) {
if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
nodesInService++;
}
}
void subtractStorage(final DatanodeStorageInfo info,
final DatanodeDescriptor node) {
capacityUsed -= info.getDfsUsed();
blockPoolUsed -= info.getBlockPoolUsed();
if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
capacityTotal -= info.getCapacity();
capacityRemaining -= info.getRemaining();
} else {
capacityTotal -= info.getDfsUsed();
}
}
void subtractNode(final DatanodeDescriptor node) {
if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
nodesInService--;
}
}
}

View File

@ -0,0 +1,146 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mortbay.util.ajax.JSON;
/**
* Class for testing {@link BlockStatsMXBean} implementation
*/
public class TestBlockStatsMXBean {
private MiniDFSCluster cluster;
@Before
public void setup() throws IOException {
HdfsConfiguration conf = new HdfsConfiguration();
cluster = null;
StorageType[][] types = new StorageType[6][];
for (int i=0; i<3; i++) {
types[i] = new StorageType[] {StorageType.RAM_DISK, StorageType.DISK};
}
for (int i=3; i< 5; i++) {
types[i] = new StorageType[] {StorageType.RAM_DISK, StorageType.ARCHIVE};
}
types[5] = new StorageType[] {StorageType.RAM_DISK, StorageType.ARCHIVE,
StorageType.ARCHIVE};
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(6).
storageTypes(types).storagesPerDatanode(3).build();
cluster.waitActive();
}
@After
public void tearDown() {
if (cluster != null) {
cluster.shutdown();
}
}
@Test
public void testStorageTypeStats() throws Exception {
Map<StorageType, StorageTypeStats> storageTypeStatsMap =
cluster.getNamesystem().getBlockManager().getStorageTypeStats();
assertTrue(storageTypeStatsMap.containsKey(StorageType.RAM_DISK));
assertTrue(storageTypeStatsMap.containsKey(StorageType.DISK));
assertTrue(storageTypeStatsMap.containsKey(StorageType.ARCHIVE));
StorageTypeStats storageTypeStats =
storageTypeStatsMap.get(StorageType.RAM_DISK);
assertEquals(6, storageTypeStats.getNodesInService());
storageTypeStats = storageTypeStatsMap.get(StorageType.DISK);
assertEquals(3, storageTypeStats.getNodesInService());
storageTypeStats = storageTypeStatsMap.get(StorageType.ARCHIVE);
assertEquals(3, storageTypeStats.getNodesInService());
}
protected static String readOutput(URL url) throws IOException {
StringBuilder out = new StringBuilder();
InputStream in = url.openConnection().getInputStream();
byte[] buffer = new byte[64 * 1024];
int len = in.read(buffer);
while (len > 0) {
out.append(new String(buffer, 0, len));
len = in.read(buffer);
}
return out.toString();
}
@Test
@SuppressWarnings("unchecked")
public void testStorageTypeStatsJMX() throws Exception {
URL baseUrl = new URL (cluster.getHttpUri(0));
String result = readOutput(new URL(baseUrl, "/jmx"));
System.out.println(result);
Map<String, Object> stat = (Map<String, Object>) JSON.parse(result);
Object[] beans =(Object[]) stat.get("beans");
Map<String, Object> blockStats = null;
for (Object bean : beans) {
Map<String, Object> map = (Map<String, Object>) bean;
if (map.get("name").equals("Hadoop:service=NameNode,name=BlockStats")) {
blockStats = map;
}
}
assertNotNull(blockStats);
Object[] storageTypeStatsList =
(Object[])blockStats.get("StorageTypeStats");
assertNotNull(storageTypeStatsList);
assertEquals (3, storageTypeStatsList.length);
Set<String> typesPresent = new HashSet<> ();
for (Object obj : storageTypeStatsList) {
Map<String, Object> entry = (Map<String, Object>)obj;
String storageType = (String)entry.get("key");
Map<String,Object> storageTypeStats = (Map<String,Object>)entry.get("value");
typesPresent.add(storageType);
if (storageType.equals("ARCHIVE") || storageType.equals("DISK") ) {
assertEquals(3l, storageTypeStats.get("nodesInService"));
} else if (storageType.equals("RAM_DISK")) {
assertEquals(6l, storageTypeStats.get("nodesInService"));
}
else {
fail();
}
}
assertTrue(typesPresent.contains("ARCHIVE"));
assertTrue(typesPresent.contains("DISK"));
assertTrue(typesPresent.contains("RAM_DISK"));
}
}