HDFS-8953. DataNode Metrics logging (Contributed by Kanaka Kumar Avvaru)

This commit is contained in:
Vinayakumar B 2015-09-16 00:18:29 +05:30
parent ae5308fe1d
commit ce69c9b54c
10 changed files with 575 additions and 133 deletions

View File

@ -162,6 +162,20 @@ log4j.appender.NNMETRICSRFA.layout.ConversionPattern=%d{ISO8601} %m%n
log4j.appender.NNMETRICSRFA.MaxBackupIndex=1
log4j.appender.NNMETRICSRFA.MaxFileSize=64MB
#
# DataNode metrics logging.
# The default is to retain two datanode-metrics.log files up to 64MB each.
#
datanode.metrics.logger=INFO,NullAppender
log4j.logger.DataNodeMetricsLog=${datanode.metrics.logger}
log4j.additivity.DataNodeMetricsLog=false
log4j.appender.DNMETRICSRFA=org.apache.log4j.RollingFileAppender
log4j.appender.DNMETRICSRFA.File=${hadoop.log.dir}/datanode-metrics.log
log4j.appender.DNMETRICSRFA.layout=org.apache.log4j.PatternLayout
log4j.appender.DNMETRICSRFA.layout.ConversionPattern=%d{ISO8601} %m%n
log4j.appender.DNMETRICSRFA.MaxBackupIndex=1
log4j.appender.DNMETRICSRFA.MaxFileSize=64MB
#
# mapred audit logging
#

View File

@ -918,6 +918,8 @@ Release 2.8.0 - UNRELEASED
HDFS-9008. Balancer#Parameters class could use a builder pattern.
(Chris Trezzo via mingma)
HDFS-8953. DataNode Metrics logging (Kanaka Kumar Avvaru via vinayakumarb)
OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -364,6 +364,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
"dfs.namenode.metrics.logger.period.seconds";
public static final int DFS_NAMENODE_METRICS_LOGGER_PERIOD_SECONDS_DEFAULT =
600;
public static final String DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_KEY =
"dfs.datanode.metrics.logger.period.seconds";
public static final int DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_DEFAULT =
600;
public static final String DFS_BALANCER_MOVEDWINWIDTH_KEY = "dfs.balancer.movedWinWidth";
public static final long DFS_BALANCER_MOVEDWINWIDTH_DEFAULT = 5400*1000L;

View File

@ -0,0 +1,174 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.common;
import java.lang.management.ManagementFactory;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import javax.management.Attribute;
import javax.management.AttributeList;
import javax.management.MBeanAttributeInfo;
import javax.management.MBeanInfo;
import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.log4j.Appender;
import org.apache.log4j.AsyncAppender;
/**
* MetricsLoggerTask can be used as utility to dump metrics to log.
*/
public class MetricsLoggerTask implements Runnable {
public static final Log LOG = LogFactory.getLog(MetricsLoggerTask.class);
private static ObjectName objectName = null;
static {
try {
objectName = new ObjectName("Hadoop:*");
} catch (MalformedObjectNameException m) {
// This should not occur in practice since we pass
// a valid pattern to the constructor above.
}
}
private Log metricsLog;
private String nodeName;
private short maxLogLineLength;
public MetricsLoggerTask(Log metricsLog, String nodeName,
short maxLogLineLength) {
this.metricsLog = metricsLog;
this.nodeName = nodeName;
this.maxLogLineLength = maxLogLineLength;
}
/**
* Write metrics to the metrics appender when invoked.
*/
@Override
public void run() {
// Skip querying metrics if there are no known appenders.
if (!metricsLog.isInfoEnabled() || !hasAppenders(metricsLog)
|| objectName == null) {
return;
}
metricsLog.info(" >> Begin " + nodeName + " metrics dump");
final MBeanServer server = ManagementFactory.getPlatformMBeanServer();
// Iterate over each MBean.
for (final ObjectName mbeanName : server.queryNames(objectName, null)) {
try {
MBeanInfo mBeanInfo = server.getMBeanInfo(mbeanName);
final String mBeanNameName = MBeans.getMbeanNameName(mbeanName);
final Set<String> attributeNames = getFilteredAttributes(mBeanInfo);
final AttributeList attributes = server.getAttributes(mbeanName,
attributeNames.toArray(new String[attributeNames.size()]));
for (Object o : attributes) {
final Attribute attribute = (Attribute) o;
final Object value = attribute.getValue();
final String valueStr = (value != null) ? value.toString() : "null";
// Truncate the value if it is too long
metricsLog.info(mBeanNameName + ":" + attribute.getName() + "="
+ trimLine(valueStr));
}
} catch (Exception e) {
metricsLog.error("Failed to get " + nodeName + " metrics for mbean "
+ mbeanName.toString(), e);
}
}
metricsLog.info(" << End " + nodeName + " metrics dump");
}
private String trimLine(String valueStr) {
if (maxLogLineLength <= 0) {
return valueStr;
}
return (valueStr.length() < maxLogLineLength ? valueStr : valueStr
.substring(0, maxLogLineLength) + "...");
}
private static boolean hasAppenders(Log logger) {
if (!(logger instanceof Log4JLogger)) {
// Don't bother trying to determine the presence of appenders.
return true;
}
Log4JLogger log4JLogger = ((Log4JLogger) logger);
return log4JLogger.getLogger().getAllAppenders().hasMoreElements();
}
/**
* Get the list of attributes for the MBean, filtering out a few attribute
* types.
*/
private static Set<String> getFilteredAttributes(MBeanInfo mBeanInfo) {
Set<String> attributeNames = new HashSet<>();
for (MBeanAttributeInfo attributeInfo : mBeanInfo.getAttributes()) {
if (!attributeInfo.getType().equals(
"javax.management.openmbean.TabularData")
&& !attributeInfo.getType().equals(
"javax.management.openmbean.CompositeData")
&& !attributeInfo.getType().equals(
"[Ljavax.management.openmbean.CompositeData;")) {
attributeNames.add(attributeInfo.getName());
}
}
return attributeNames;
}
/**
* Make the metrics logger async and add all pre-existing appenders to the
* async appender.
*/
public static void makeMetricsLoggerAsync(Log metricsLog) {
if (!(metricsLog instanceof Log4JLogger)) {
LOG.warn("Metrics logging will not be async since "
+ "the logger is not log4j");
return;
}
org.apache.log4j.Logger logger = ((Log4JLogger) metricsLog).getLogger();
logger.setAdditivity(false); // Don't pollute actual logs with metrics dump
@SuppressWarnings("unchecked")
List<Appender> appenders = Collections.list(logger.getAllAppenders());
// failsafe against trying to async it more than once
if (!appenders.isEmpty() && !(appenders.get(0) instanceof AsyncAppender)) {
AsyncAppender asyncAppender = new AsyncAppender();
// change logger to have an async appender containing all the
// previously configured appenders
for (Appender appender : appenders) {
logger.removeAppender(appender);
asyncAppender.addAppender(appender);
}
logger.addAppender(asyncAppender);
}
}
}

View File

@ -46,6 +46,8 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_KEY;
import static org.apache.hadoop.util.ExitUtil.terminate;
import java.io.BufferedOutputStream;
@ -85,6 +87,8 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.management.ObjectName;
@ -148,6 +152,7 @@
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.hdfs.server.common.MetricsLoggerTask;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
@ -277,6 +282,8 @@ public class DataNode extends ReconfigurableBase
Collections.unmodifiableList(
Arrays.asList(DFS_DATANODE_DATA_DIR_KEY));
public static final Log METRICS_LOG = LogFactory.getLog("DataNodeMetricsLog");
/**
* Use {@link NetUtils#createSocketAddr(String)} instead.
*/
@ -363,6 +370,8 @@ public static InetSocketAddress createSocketAddr(String target) {
private long[] oobTimeouts; /** timeout value of each OOB type */
private ScheduledThreadPoolExecutor metricsLoggerTimer;
/**
* Creates a dummy DataNode for testing purpose.
*/
@ -382,7 +391,7 @@ public static InetSocketAddress createSocketAddr(String target) {
/**
* Create the DataNode given a configuration, an array of dataDirs,
* and a namenode proxy
* and a namenode proxy.
*/
DataNode(final Configuration conf,
final List<StorageLocation> dataDirs,
@ -1166,6 +1175,7 @@ void startDataNode(Configuration conf,
saslClient = new SaslDataTransferClient(dnConf.conf,
dnConf.saslPropsResolver, dnConf.trustedChannelResolver);
saslServer = new SaslDataTransferServer(dnConf, blockPoolTokenSecretManager);
startMetricsLogger(conf);
}
/**
@ -1649,6 +1659,7 @@ private void checkBlockToken(ExtendedBlock block, Token<BlockTokenIdentifier> to
* Otherwise, deadlock might occur.
*/
public void shutdown() {
stopMetricsLogger();
if (plugins != null) {
for (ServicePlugin p : plugins) {
try {
@ -3276,4 +3287,41 @@ public long getOOBTimeout(Status status)
return oobTimeouts[status.getNumber() - Status.OOB_RESTART_VALUE];
}
/**
* Start a timer to periodically write DataNode metrics to the log file. This
* behavior can be disabled by configuration.
*
* @param metricConf
*/
protected void startMetricsLogger(Configuration metricConf) {
long metricsLoggerPeriodSec = metricConf.getInt(
DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_KEY,
DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_DEFAULT);
if (metricsLoggerPeriodSec <= 0) {
return;
}
MetricsLoggerTask.makeMetricsLoggerAsync(METRICS_LOG);
// Schedule the periodic logging.
metricsLoggerTimer = new ScheduledThreadPoolExecutor(1);
metricsLoggerTimer.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
metricsLoggerTimer.scheduleWithFixedDelay(new MetricsLoggerTask(METRICS_LOG,
"DataNode", (short) 0), metricsLoggerPeriodSec, metricsLoggerPeriodSec,
TimeUnit.SECONDS);
}
protected void stopMetricsLogger() {
if (metricsLoggerTimer != null) {
metricsLoggerTimer.shutdown();
metricsLoggerTimer = null;
}
}
@VisibleForTesting
ScheduledThreadPoolExecutor getMetricsLoggerTimer() {
return metricsLoggerTimer;
}
}

View File

@ -23,7 +23,6 @@
import com.google.common.collect.Lists;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
@ -45,6 +44,7 @@
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.RollingUpgradeStartupOption;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.hdfs.server.common.MetricsLoggerTask;
import org.apache.hadoop.hdfs.server.namenode.ha.ActiveState;
import org.apache.hadoop.hdfs.server.namenode.ha.BootstrapStandby;
import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
@ -78,32 +78,20 @@
import org.apache.hadoop.util.JvmPauseMonitor;
import org.apache.hadoop.util.ServicePlugin;
import org.apache.hadoop.util.StringUtils;
import org.apache.log4j.Appender;
import org.apache.log4j.AsyncAppender;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.management.Attribute;
import javax.management.AttributeList;
import javax.management.MBeanAttributeInfo;
import javax.management.MBeanInfo;
import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import java.io.IOException;
import java.io.PrintStream;
import java.lang.management.ManagementFactory;
import java.net.InetSocketAddress;
import java.net.URI;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -711,46 +699,19 @@ protected void startMetricsLogger(Configuration conf) {
return;
}
makeMetricsLoggerAsync();
MetricsLoggerTask.makeMetricsLoggerAsync(MetricsLog);
// Schedule the periodic logging.
metricsLoggerTimer = new ScheduledThreadPoolExecutor(1);
metricsLoggerTimer.setExecuteExistingDelayedTasksAfterShutdownPolicy(
false);
metricsLoggerTimer.scheduleWithFixedDelay(new MetricsLoggerTask(),
metricsLoggerTimer.scheduleWithFixedDelay(new MetricsLoggerTask(MetricsLog,
"NameNode", (short) 128),
metricsLoggerPeriodSec,
metricsLoggerPeriodSec,
TimeUnit.SECONDS);
}
/**
* Make the metrics logger async and add all pre-existing appenders
* to the async appender.
*/
private static void makeMetricsLoggerAsync() {
if (!(MetricsLog instanceof Log4JLogger)) {
LOG.warn(
"Metrics logging will not be async since the logger is not log4j");
return;
}
org.apache.log4j.Logger logger = ((Log4JLogger) MetricsLog).getLogger();
logger.setAdditivity(false); // Don't pollute NN logs with metrics dump
@SuppressWarnings("unchecked")
List<Appender> appenders = Collections.list(logger.getAllAppenders());
// failsafe against trying to async it more than once
if (!appenders.isEmpty() && !(appenders.get(0) instanceof AsyncAppender)) {
AsyncAppender asyncAppender = new AsyncAppender();
// change logger to have an async appender containing all the
// previously configured appenders
for (Appender appender : appenders) {
logger.removeAppender(appender);
asyncAppender.addAppender(appender);
}
logger.addAppender(asyncAppender);
}
}
protected void stopMetricsLogger() {
if (metricsLoggerTimer != null) {
metricsLoggerTimer.shutdown();
@ -1925,91 +1886,4 @@ void checkHaStateChange(StateChangeRequestInfo req)
break;
}
}
private static class MetricsLoggerTask implements Runnable {
private static final int MAX_LOGGED_VALUE_LEN = 128;
private static ObjectName OBJECT_NAME = null;
static {
try {
OBJECT_NAME = new ObjectName("Hadoop:*");
} catch (MalformedObjectNameException m) {
// This should not occur in practice since we pass
// a valid pattern to the constructor above.
}
}
/**
* Write NameNode metrics to the metrics appender when invoked.
*/
@Override
public void run() {
// Skip querying metrics if there are no known appenders.
if (!MetricsLog.isInfoEnabled() ||
!hasAppenders(MetricsLog) ||
OBJECT_NAME == null) {
return;
}
MetricsLog.info(" >> Begin NameNode metrics dump");
final MBeanServer server = ManagementFactory.getPlatformMBeanServer();
// Iterate over each MBean.
for (final ObjectName mbeanName : server.queryNames(OBJECT_NAME, null)) {
try {
MBeanInfo mBeanInfo = server.getMBeanInfo(mbeanName);
final String mBeanNameName = MBeans.getMbeanNameName(mbeanName);
final Set<String> attributeNames = getFilteredAttributes(mBeanInfo);
final AttributeList attributes =
server.getAttributes(mbeanName,
attributeNames.toArray(new String[attributeNames.size()]));
for (Object o : attributes) {
final Attribute attribute = (Attribute) o;
final Object value = attribute.getValue();
final String valueStr =
(value != null) ? value.toString() : "null";
// Truncate the value if it is too long
MetricsLog.info(mBeanNameName + ":" + attribute.getName() + "=" +
(valueStr.length() < MAX_LOGGED_VALUE_LEN ? valueStr :
valueStr.substring(0, MAX_LOGGED_VALUE_LEN) + "..."));
}
} catch (Exception e) {
MetricsLog.error("Failed to get NameNode metrics for mbean " +
mbeanName.toString(), e);
}
}
MetricsLog.info(" << End NameNode metrics dump");
}
private static boolean hasAppenders(Log logger) {
if (!(logger instanceof Log4JLogger)) {
// Don't bother trying to determine the presence of appenders.
return true;
}
Log4JLogger log4JLogger = ((Log4JLogger) MetricsLog);
return log4JLogger.getLogger().getAllAppenders().hasMoreElements();
}
/**
* Get the list of attributes for the MBean, filtering out a few
* attribute types.
*/
private static Set<String> getFilteredAttributes(
MBeanInfo mBeanInfo) {
Set<String> attributeNames = new HashSet<>();
for (MBeanAttributeInfo attributeInfo : mBeanInfo.getAttributes()) {
if (!attributeInfo.getType().equals(
"javax.management.openmbean.TabularData") &&
!attributeInfo.getType().equals(
"javax.management.openmbean.CompositeData") &&
!attributeInfo.getType().equals(
"[Ljavax.management.openmbean.CompositeData;")) {
attributeNames.add(attributeInfo.getName());
}
}
return attributeNames;
}
}
}

View File

@ -1599,6 +1599,18 @@
</description>
</property>
<property>
<name>dfs.datanode.metrics.logger.period.seconds</name>
<value>600</value>
<description>
This setting controls how frequently the DataNode logs its metrics. The
logging configuration must also define one or more appenders for
DataNodeMetricsLog for the metrics to be logged.
DataNode metrics logging is disabled if this value is set to zero or
less than zero.
</description>
</property>
<property>
<name>dfs.metrics.percentiles.intervals</name>
<value></value>

View File

@ -19,21 +19,38 @@
package org.apache.hadoop.hdfs.server.datanode;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.junit.Assert;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import com.google.common.base.Preconditions;
@ -44,7 +61,10 @@
public class DataNodeTestUtils {
private static final String DIR_FAILURE_SUFFIX = ".origin";
public static DatanodeRegistration
public final static String TEST_CLUSTER_ID = "testClusterID";
public final static String TEST_POOL_ID = "BP-TEST";
public static DatanodeRegistration
getDNRegistrationForBP(DataNode dn, String bpid) throws IOException {
return dn.getDNRegistrationForBP(bpid);
}
@ -231,4 +251,61 @@ public static void runDirectoryScanner(DataNode dn) throws IOException {
dn.getDirectoryScanner().reconcile();
}
}
/**
* Starts an instance of DataNode with NN mocked. Called should ensure to
* shutdown the DN
*
* @throws IOException
*/
public static DataNode startDNWithMockNN(Configuration conf,
final InetSocketAddress nnSocketAddr, final String dnDataDir)
throws IOException {
FileSystem.setDefaultUri(conf, "hdfs://" + nnSocketAddr.getHostName() + ":"
+ nnSocketAddr.getPort());
ArrayList<StorageLocation> locations = new ArrayList<StorageLocation>();
File dataDir = new File(dnDataDir);
FileUtil.fullyDelete(dataDir);
dataDir.mkdirs();
StorageLocation location = StorageLocation.parse(dataDir.getPath());
locations.add(location);
final DatanodeProtocolClientSideTranslatorPB namenode =
mock(DatanodeProtocolClientSideTranslatorPB.class);
Mockito.doAnswer(new Answer<DatanodeRegistration>() {
@Override
public DatanodeRegistration answer(InvocationOnMock invocation)
throws Throwable {
return (DatanodeRegistration) invocation.getArguments()[0];
}
}).when(namenode).registerDatanode(Mockito.any(DatanodeRegistration.class));
when(namenode.versionRequest()).thenReturn(
new NamespaceInfo(1, TEST_CLUSTER_ID, TEST_POOL_ID, 1L));
when(
namenode.sendHeartbeat(Mockito.any(DatanodeRegistration.class),
Mockito.any(StorageReport[].class), Mockito.anyLong(),
Mockito.anyLong(), Mockito.anyInt(), Mockito.anyInt(),
Mockito.anyInt(), Mockito.any(VolumeFailureSummary.class),
Mockito.anyBoolean())).thenReturn(
new HeartbeatResponse(new DatanodeCommand[0], new NNHAStatusHeartbeat(
HAServiceState.ACTIVE, 1), null, ThreadLocalRandom.current()
.nextLong() | 1L));
DataNode dn = new DataNode(conf, locations, null) {
@Override
DatanodeProtocolClientSideTranslatorPB connectToNN(
InetSocketAddress nnAddr) throws IOException {
Assert.assertEquals(nnSocketAddr, nnAddr);
return namenode;
}
};
// Trigger a heartbeat so that it acknowledges the NN as active.
dn.getAllBpOs().get(0).triggerHeartbeatForTests();
return dn;
}
}

View File

@ -0,0 +1,224 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_KEY;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeoutException;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Appender;
import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.AsyncAppender;
import org.apache.log4j.spi.LoggingEvent;
import org.junit.After;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import com.google.common.base.Supplier;
/**
* Test periodic logging of DataNode metrics.
*/
public class TestDataNodeMetricsLogger {
static final Log LOG = LogFactory.getLog(TestDataNodeMetricsLogger.class);
private static final String DATA_DIR = MiniDFSCluster.getBaseDirectory()
+ "data";
private final static InetSocketAddress NN_ADDR = new InetSocketAddress(
"localhost", 5020);
private DataNode dn;
static final Random random = new Random(System.currentTimeMillis());
@Rule
public Timeout timeout = new Timeout(300000);
/**
* Starts an instance of DataNode
*
* @throws IOException
*/
public void startDNForTest(boolean enableMetricsLogging) throws IOException {
Configuration conf = new HdfsConfiguration();
conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, DATA_DIR);
conf.set(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY, "0.0.0.0:0");
conf.set(DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
conf.set(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY, "0.0.0.0:0");
conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
conf.setInt(DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_KEY,
enableMetricsLogging ? 1 : 0); // If enabled, log early and log often
dn = DataNodeTestUtils.startDNWithMockNN(conf, NN_ADDR, DATA_DIR);
}
/**
* Cleans the resources and closes the instance of datanode
*
* @throws IOException
* if an error occurred
*/
@After
public void tearDown() throws IOException {
if (dn != null) {
try {
dn.shutdown();
} catch (Exception e) {
LOG.error("Cannot close: ", e);
} finally {
File dir = new File(DATA_DIR);
if (dir.exists())
Assert.assertTrue("Cannot delete data-node dirs",
FileUtil.fullyDelete(dir));
}
}
dn = null;
}
@Test
public void testMetricsLoggerOnByDefault() throws IOException {
startDNForTest(true);
assertNotNull(dn);
assertNotNull(dn.getMetricsLoggerTimer());
}
@Test
public void testDisableMetricsLogger() throws IOException {
startDNForTest(false);
assertNotNull(dn);
assertNull(dn.getMetricsLoggerTimer());
}
@Test
public void testMetricsLoggerIsAsync() throws IOException {
startDNForTest(true);
assertNotNull(dn);
org.apache.log4j.Logger logger = ((Log4JLogger) DataNode.METRICS_LOG)
.getLogger();
@SuppressWarnings("unchecked")
List<Appender> appenders = Collections.list(logger.getAllAppenders());
assertTrue(appenders.get(0) instanceof AsyncAppender);
}
/**
* Publish a fake metric under the "Hadoop:" domain and ensure it is logged by
* the metrics logger.
*/
@Test
public void testMetricsLogOutput() throws IOException, InterruptedException,
TimeoutException {
TestFakeMetric metricsProvider = new TestFakeMetric();
MBeans.register(this.getClass().getSimpleName(), "DummyMetrics",
metricsProvider);
startDNForTest(true);
assertNotNull(dn);
final PatternMatchingAppender appender = new PatternMatchingAppender(
"^.*FakeMetric.*$");
addAppender(DataNode.METRICS_LOG, appender);
// Ensure that the supplied pattern was matched.
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return appender.isMatched();
}
}, 1000, 60000);
dn.shutdown();
}
private void addAppender(Log log, Appender appender) {
org.apache.log4j.Logger logger = ((Log4JLogger) log).getLogger();
@SuppressWarnings("unchecked")
List<Appender> appenders = Collections.list(logger.getAllAppenders());
((AsyncAppender) appenders.get(0)).addAppender(appender);
}
public interface TestFakeMetricMXBean {
int getFakeMetric();
}
/**
* MBean for testing
*/
public static class TestFakeMetric implements TestFakeMetricMXBean {
@Override
public int getFakeMetric() {
return 0;
}
}
/**
* An appender that matches logged messages against the given regular
* expression.
*/
public static class PatternMatchingAppender extends AppenderSkeleton {
private final Pattern pattern;
private volatile boolean matched;
public PatternMatchingAppender(String pattern) {
this.pattern = Pattern.compile(pattern);
this.matched = false;
}
public boolean isMatched() {
return matched;
}
@Override
protected void append(LoggingEvent event) {
if (pattern.matcher(event.getMessage().toString()).matches()) {
matched = true;
}
}
@Override
public void close() {
}
@Override
public boolean requiresLayout() {
return false;
}
}
}

View File

@ -34,3 +34,16 @@ log4j.appender.NNMETRICSRFA.layout=org.apache.log4j.PatternLayout
log4j.appender.NNMETRICSRFA.layout.ConversionPattern=%d{ISO8601} %m%n
log4j.appender.NNMETRICSRFA.MaxBackupIndex=1
log4j.appender.NNMETRICSRFA.MaxFileSize=64MB
#
# DataNode metrics logging.
# The default is to retain two datanode-metrics.log files up to 64MB each.
#
log4j.logger.DataNodeMetricsLog=INFO,DNMETRICSRFA
log4j.additivity.DataNodeMetricsLog=false
log4j.appender.DNMETRICSRFA=org.apache.log4j.RollingFileAppender
log4j.appender.DNMETRICSRFA.File=${hadoop.log.dir}/datanode-metrics.log
log4j.appender.DNMETRICSRFA.layout=org.apache.log4j.PatternLayout
log4j.appender.DNMETRICSRFA.layout.ConversionPattern=%d{ISO8601} %m%n
log4j.appender.DNMETRICSRFA.MaxBackupIndex=1
log4j.appender.DNMETRICSRFA.MaxFileSize=64MB