HDFS-10928. Ozone:SCM: Support MXBean for SCM and NodeManager. Contributed by Weiwei Yang.

This commit is contained in:
Anu Engineer 2017-03-08 18:52:57 -08:00 committed by Owen O'Malley
parent 386dbc11da
commit 1058aa2523
8 changed files with 346 additions and 32 deletions

View File

@ -0,0 +1,52 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.scm;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.ozone.scm.node.SCMNodeManager;
import java.util.Map;
/**
*
* This is the JMX management interface for scm information.
*/
@InterfaceAudience.Private
public interface SCMMXBean {
/**
* Get the number of data nodes that in all states,
* valid states are defined by {@link SCMNodeManager.NODESTATE}.
*
* @return A state to number of nodes that in this state mapping
*/
public Map<String, Integer> getNodeCount();
/**
* Get the SCM RPC server port that used to listen to datanode requests.
* @return SCM datanode RPC server port
*/
public String getDatanodeRpcPort();
/**
* Get the SCM RPC server port that used to listen to client requests.
* @return SCM client RPC server port
*/
public String getClientRpcPort();
}

View File

@ -26,6 +26,7 @@
import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.ozone.OzoneClientUtils; import org.apache.hadoop.ozone.OzoneClientUtils;
import org.apache.hadoop.ozone.OzoneConfiguration; import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.scm.protocol.LocatedContainer; import org.apache.hadoop.scm.protocol.LocatedContainer;
@ -73,12 +74,15 @@
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.management.ObjectName;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
import java.util.Map;
import java.util.HashMap;
import static org.apache.hadoop.ozone.OzoneConfigKeys import static org.apache.hadoop.ozone.OzoneConfigKeys
.OZONE_SCM_CLIENT_ADDRESS_KEY; .OZONE_SCM_CLIENT_ADDRESS_KEY;
@ -107,7 +111,7 @@
@InterfaceAudience.LimitedPrivate({"HDFS", "CBLOCK", "OZONE", "HBASE"}) @InterfaceAudience.LimitedPrivate({"HDFS", "CBLOCK", "OZONE", "HBASE"})
public class StorageContainerManager public class StorageContainerManager
implements StorageContainerDatanodeProtocol, implements StorageContainerDatanodeProtocol,
StorageContainerLocationProtocol { StorageContainerLocationProtocol, SCMMXBean {
private static final Logger LOG = private static final Logger LOG =
LoggerFactory.getLogger(StorageContainerManager.class); LoggerFactory.getLogger(StorageContainerManager.class);
@ -126,6 +130,9 @@ public class StorageContainerManager
private final RPC.Server clientRpcServer; private final RPC.Server clientRpcServer;
private final InetSocketAddress clientRpcAddress; private final InetSocketAddress clientRpcAddress;
/** SCM mxbean*/
private ObjectName scmInfoBeanName;
/** /**
* Creates a new StorageContainerManager. Configuration will be updated with * Creates a new StorageContainerManager. Configuration will be updated with
* information on the actual listening addresses used for RPC servers. * information on the actual listening addresses used for RPC servers.
@ -161,7 +168,6 @@ public StorageContainerManager(OzoneConfiguration conf)
datanodeRpcAddress = updateListenAddress(conf, datanodeRpcAddress = updateListenAddress(conf,
OZONE_SCM_DATANODE_ADDRESS_KEY, datanodeRpcAddr, datanodeRpcServer); OZONE_SCM_DATANODE_ADDRESS_KEY, datanodeRpcAddr, datanodeRpcServer);
BlockingService storageProtoPbService = BlockingService storageProtoPbService =
StorageContainerLocationProtocolProtos StorageContainerLocationProtocolProtos
.StorageContainerLocationProtocolService .StorageContainerLocationProtocolService
@ -176,6 +182,8 @@ public StorageContainerManager(OzoneConfiguration conf)
handlerCount); handlerCount);
clientRpcAddress = updateListenAddress(conf, clientRpcAddress = updateListenAddress(conf,
OZONE_SCM_CLIENT_ADDRESS_KEY, scmAddress, clientRpcServer); OZONE_SCM_CLIENT_ADDRESS_KEY, scmAddress, clientRpcServer);
registerMXBean();
} }
/** /**
@ -222,6 +230,18 @@ private static RPC.Server startRpcServer(OzoneConfiguration conf,
return rpcServer; return rpcServer;
} }
private void registerMXBean() {
this.scmInfoBeanName = MBeans.register("StorageContainerManager",
"StorageContainerManagerInfo", this);
}
private void unregisterMXBean() {
if(this.scmInfoBeanName != null) {
MBeans.unregister(this.scmInfoBeanName);
this.scmInfoBeanName = null;
}
}
/** /**
* After starting an RPC server, updates configuration with the actual * After starting an RPC server, updates configuration with the actual
* listening address of that server. The listening address may be different * listening address of that server. The listening address may be different
@ -334,6 +354,12 @@ public InetSocketAddress getClientRpcAddress() {
return clientRpcAddress; return clientRpcAddress;
} }
@Override
public String getClientRpcPort() {
InetSocketAddress addr = getClientRpcAddress();
return addr == null ? "0" : Integer.toString(addr.getPort());
}
/** /**
* Returns listening address of StorageDatanode Protocol RPC server. * Returns listening address of StorageDatanode Protocol RPC server.
* *
@ -343,6 +369,12 @@ public InetSocketAddress getDatanodeRpcAddress() {
return datanodeRpcAddress; return datanodeRpcAddress;
} }
@Override
public String getDatanodeRpcPort() {
InetSocketAddress addr = getDatanodeRpcAddress();
return addr == null ? "0" : Integer.toString(addr.getPort());
}
/** /**
* Start service. * Start service.
*/ */
@ -363,6 +395,7 @@ public void stop() {
clientRpcServer.stop(); clientRpcServer.stop();
LOG.info("Stopping the RPC server for DataNodes"); LOG.info("Stopping the RPC server for DataNodes");
datanodeRpcServer.stop(); datanodeRpcServer.stop();
unregisterMXBean();
} }
/** /**
@ -437,6 +470,15 @@ public int getNodeCount(SCMNodeManager.NODESTATE nodestate) {
return scmNodeManager.getNodeCount(nodestate); return scmNodeManager.getNodeCount(nodestate);
} }
@Override
public Map<String, Integer> getNodeCount() {
Map<String, Integer> countMap = new HashMap<String, Integer>();
for (SCMNodeManager.NODESTATE state : SCMNodeManager.NODESTATE.values()) {
countMap.put(state.toString(), scmNodeManager.getNodeCount(state));
}
return countMap;
}
/** /**
* Returns node manager. * Returns node manager.
* @return - Node Manager * @return - Node Manager

View File

@ -46,8 +46,8 @@
* DECOMMISSIONED - Someone told us to remove this node from the tracking * DECOMMISSIONED - Someone told us to remove this node from the tracking
* list, by calling removeNode. We will throw away this nodes info soon. * list, by calling removeNode. We will throw away this nodes info soon.
*/ */
public interface NodeManager extends StorageContainerNodeProtocol, Closeable, public interface NodeManager extends StorageContainerNodeProtocol,
Runnable { NodeManagerMXBean, Closeable, Runnable {
/** /**
* Removes a data node from the management of this Node Manager. * Removes a data node from the management of this Node Manager.
* *
@ -78,20 +78,6 @@ public interface NodeManager extends StorageContainerNodeProtocol, Closeable,
*/ */
List<DatanodeID> getAllNodes(); List<DatanodeID> getAllNodes();
/**
* Get the minimum number of nodes to get out of chill mode.
*
* @return int
*/
int getMinimumChillModeNodes();
/**
* Reports if we have exited out of chill mode by discovering enough nodes.
*
* @return True if we are out of Node layer chill mode, false otherwise.
*/
boolean isOutOfNodeChillMode();
/** /**
* Chill mode is the period when node manager waits for a minimum * Chill mode is the period when node manager waits for a minimum
* configured number of datanodes to report in. This is called chill mode * configured number of datanodes to report in. This is called chill mode
@ -112,19 +98,6 @@ public interface NodeManager extends StorageContainerNodeProtocol, Closeable,
*/ */
void clearChillModeFlag(); void clearChillModeFlag();
/**
* Returns a chill mode status string.
* @return String
*/
String getChillModeStatus();
/**
* Returns the status of manual chill mode flag.
* @return true if forceEnterChillMode has been called,
* false if forceExitChillMode or status is not set. eg. clearChillModeFlag.
*/
boolean isInManualChillMode();
/** /**
* Enum that represents the Node State. This is used in calls to getNodeList * Enum that represents the Node State. This is used in calls to getNodeList
* and getNodeCount. TODO: Add decommission when we support it. * and getNodeCount. TODO: Add decommission when we support it.

View File

@ -0,0 +1,66 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.scm.node;
import org.apache.hadoop.classification.InterfaceAudience;
import java.util.Map;
/**
*
* This is the JMX management interface for node manager information.
*/
@InterfaceAudience.Private
public interface NodeManagerMXBean {
/**
* Get the minimum number of nodes to get out of chill mode.
*
* @return int
*/
public int getMinimumChillModeNodes();
/**
* Reports if we have exited out of chill mode by discovering enough nodes.
*
* @return True if we are out of Node layer chill mode, false otherwise.
*/
public boolean isOutOfNodeChillMode();
/**
* Returns a chill mode status string.
* @return String
*/
public String getChillModeStatus();
/**
* Returns the status of manual chill mode flag.
* @return true if forceEnterChillMode has been called,
* false if forceExitChillMode or status is not set. eg. clearChillModeFlag.
*/
public boolean isInManualChillMode();
/**
* Get the number of data nodes that in all states,
* valid states are defined by {@link SCMNodeManager.NODESTATE}.
*
* @return A state to number of nodes that in this state mapping
*/
public Map<String, Integer> getNodeCount();
}

View File

@ -25,6 +25,7 @@
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException; import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.ozone.OzoneClientUtils; import org.apache.hadoop.ozone.OzoneClientUtils;
import org.apache.hadoop.ozone.protocol.StorageContainerNodeProtocol; import org.apache.hadoop.ozone.protocol.StorageContainerNodeProtocol;
import org.apache.hadoop.ozone.protocol.VersionResponse; import org.apache.hadoop.ozone.protocol.VersionResponse;
@ -46,6 +47,7 @@
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.management.ObjectName;
import java.io.IOException; import java.io.IOException;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
@ -123,7 +125,8 @@ public class SCMNodeManager
private final VersionInfo version; private final VersionInfo version;
private Optional<Boolean> inManualChillMode; private Optional<Boolean> inManualChillMode;
private final CommandQueue commandQueue; private final CommandQueue commandQueue;
// Node manager MXBean
private ObjectName nmInfoBean;
/** /**
* Constructs SCM machine Manager. * Constructs SCM machine Manager.
*/ */
@ -162,6 +165,20 @@ public SCMNodeManager(Configuration conf, String clusterID) {
Preconditions.checkState(heartbeatCheckerIntervalMs > 0); Preconditions.checkState(heartbeatCheckerIntervalMs > 0);
executorService.schedule(this, heartbeatCheckerIntervalMs, executorService.schedule(this, heartbeatCheckerIntervalMs,
TimeUnit.MILLISECONDS); TimeUnit.MILLISECONDS);
registerMXBean();
}
private void registerMXBean() {
this.nmInfoBean = MBeans.register("SCMNodeManager",
"SCMNodeManagerInfo", this);
}
private void unregisterMXBean() {
if(this.nmInfoBean != null) {
MBeans.unregister(this.nmInfoBean);
this.nmInfoBean = null;
}
} }
/** /**
@ -595,6 +612,7 @@ private void updateNodeStat(String datanodeID, SCMNodeReport nodeReport) {
@Override @Override
public void close() throws IOException { public void close() throws IOException {
executorService.shutdown(); executorService.shutdown();
unregisterMXBean();
try { try {
if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) { if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
executorService.shutdownNow(); executorService.shutdownNow();
@ -729,4 +747,13 @@ public List<SCMNodeStat> getNodeStats(){
return nodeStats.entrySet().stream().map( return nodeStats.entrySet().stream().map(
entry -> nodeStats.get(entry.getKey())).collect(Collectors.toList()); entry -> nodeStats.get(entry.getKey())).collect(Collectors.toList());
} }
@Override
public Map<String, Integer> getNodeCount() {
Map<String, Integer> nodeCountMap = new HashMap<String, Integer>();
for(NodeManager.NODESTATE state : NodeManager.NODESTATE.values()) {
nodeCountMap.put(state.toString(), getNodeCount(state));
}
return nodeCountMap;
}
} }

View File

@ -106,6 +106,10 @@ public void shutdown() {
scm.join(); scm.join();
} }
public StorageContainerManager getStorageContainerManager() {
return this.scm;
}
/** /**
* Creates an {@link OzoneClient} connected to this cluster's REST service. * Creates an {@link OzoneClient} connected to this cluster's REST service.
* Callers take ownership of the client and must close it when done. * Callers take ownership of the client and must close it when done.

View File

@ -0,0 +1,139 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.scm;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.scm.node.NodeManager;
import org.junit.BeforeClass;
import org.junit.AfterClass;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.util.Map;
import java.util.Iterator;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.TabularData;
public class TestSCMMXBean {
public static final Log LOG = LogFactory.getLog(TestSCMMXBean.class);
private static int numOfDatanodes = 1;
private static MiniOzoneCluster cluster;
private static OzoneConfiguration conf;
private static StorageContainerManager scm;
private static MBeanServer mbs;
@BeforeClass
public static void init() throws IOException {
conf = new OzoneConfiguration();
cluster = new MiniOzoneCluster.Builder(conf)
.numDataNodes(numOfDatanodes)
.setHandlerType("distributed")
.build();
scm = cluster.getStorageContainerManager();
mbs = ManagementFactory.getPlatformMBeanServer();
}
@AfterClass
public static void shutdown() {
IOUtils.cleanup(null, cluster);
}
@Test
public void testSCMMXBean() throws Exception {
ObjectName bean = new ObjectName(
"Hadoop:service=StorageContainerManager,"
+ "name=StorageContainerManagerInfo");
String dnRpcPort = (String)mbs.getAttribute(bean,
"DatanodeRpcPort");
assertEquals(scm.getDatanodeRpcPort(), dnRpcPort);
String clientRpcPort = (String)mbs.getAttribute(bean,
"ClientRpcPort");
assertEquals(scm.getClientRpcPort(), clientRpcPort);
TabularData nodeCountObj = (TabularData)mbs.getAttribute(bean,
"NodeCount");
verifyEquals(nodeCountObj, scm.getNodeCount());
}
@Test
public void testSCMNodeManagerMXBean() throws Exception {
final NodeManager scmNm = scm.getScmNodeManager();
ObjectName bean = new ObjectName(
"Hadoop:service=SCMNodeManager,name=SCMNodeManagerInfo");
Integer minChillNodes = (Integer)mbs.getAttribute(bean,
"MinimumChillModeNodes");
assertEquals(scmNm.getMinimumChillModeNodes(),
minChillNodes.intValue());
boolean isOutOfChillMode = (boolean)mbs.getAttribute(bean,
"OutOfNodeChillMode");
assertEquals(scmNm.isOutOfNodeChillMode(), isOutOfChillMode);
String chillStatus = (String)mbs.getAttribute(bean,
"ChillModeStatus");
assertEquals(scmNm.getChillModeStatus(), chillStatus);
boolean inManualChillMode = (boolean)mbs.getAttribute(bean,
"InManualChillMode");
assertEquals(scmNm.isInManualChillMode(), inManualChillMode);
TabularData nodeCountObj = (TabularData)mbs.getAttribute(bean,
"NodeCount");
verifyEquals(nodeCountObj, scm.getScmNodeManager().getNodeCount());
}
/**
* An internal function used to compare a TabularData returned
* by JMX with the expected data in a Map.
*/
private void verifyEquals(TabularData data1,
Map<String, Integer> data2) {
if (data1 == null || data2 == null) {
fail("Data should not be null.");
}
for (Object obj : data1.values()) {
// Each TabularData is a set of CompositeData
assertTrue(obj instanceof CompositeData);
CompositeData cds = (CompositeData) obj;
assertEquals(2, cds.values().size());
Iterator<?> it = cds.values().iterator();
String key = it.next().toString();
String value = it.next().toString();
int num = Integer.parseInt(value);
assertTrue(data2.containsKey(key));
assertEquals(data2.get(key).intValue(), num);
}
}
}

View File

@ -30,8 +30,10 @@
import org.apache.hadoop.ozone.scm.node.SCMNodeStat; import org.apache.hadoop.ozone.scm.node.SCMNodeStat;
import java.io.IOException; import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map;
/** /**
* Test Helper for testing container Mapping. * Test Helper for testing container Mapping.
@ -263,4 +265,13 @@ public List<SCMCommand> sendHeartbeat(DatanodeID datanodeID,
SCMNodeReport nodeReport) { SCMNodeReport nodeReport) {
return null; return null;
} }
@Override
public Map<String, Integer> getNodeCount() {
Map<String, Integer> nodeCountMap = new HashMap<String, Integer>();
for(NodeManager.NODESTATE state : NodeManager.NODESTATE.values()) {
nodeCountMap.put(state.toString(), getNodeCount(state));
}
return nodeCountMap;
}
} }