HDFS-7331. Add Datanode network counts to datanode jmx page. Contributed by Charles Lamb.
This commit is contained in:
parent
b8c094b075
commit
2d4f3e567e
@ -385,6 +385,9 @@ Release 2.7.0 - UNRELEASED
|
|||||||
|
|
||||||
HDFS-7420. Delegate permission checks to FSDirectory. (wheat9)
|
HDFS-7420. Delegate permission checks to FSDirectory. (wheat9)
|
||||||
|
|
||||||
|
HDFS-7331. Add Datanode network counts to datanode jmx page. (Charles Lamb
|
||||||
|
via atm)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
@ -155,6 +155,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
|||||||
public static final float DFS_DATANODE_RAM_DISK_LOW_WATERMARK_PERCENT_DEFAULT = 10.0f;
|
public static final float DFS_DATANODE_RAM_DISK_LOW_WATERMARK_PERCENT_DEFAULT = 10.0f;
|
||||||
public static final String DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES = "dfs.datanode.ram.disk.low.watermark.bytes";
|
public static final String DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES = "dfs.datanode.ram.disk.low.watermark.bytes";
|
||||||
public static final long DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES_DEFAULT = DFS_BLOCK_SIZE_DEFAULT;
|
public static final long DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES_DEFAULT = DFS_BLOCK_SIZE_DEFAULT;
|
||||||
|
public static final String DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_KEY = "dfs.datanode.network.counts.cache.max.size";
|
||||||
|
public static final int DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_DEFAULT = Integer.MAX_VALUE;
|
||||||
|
|
||||||
// This setting is for testing/internal use only.
|
// This setting is for testing/internal use only.
|
||||||
public static final String DFS_DATANODE_DUPLICATE_REPLICA_DELETION = "dfs.datanode.duplicate.replica.deletion";
|
public static final String DFS_DATANODE_DUPLICATE_REPLICA_DELETION = "dfs.datanode.duplicate.replica.deletion";
|
||||||
|
@ -38,6 +38,8 @@
|
|||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PLUGINS_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PLUGINS_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
|
||||||
@ -77,6 +79,7 @@
|
|||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
@ -84,6 +87,9 @@
|
|||||||
|
|
||||||
import javax.management.ObjectName;
|
import javax.management.ObjectName;
|
||||||
|
|
||||||
|
import com.google.common.cache.CacheBuilder;
|
||||||
|
import com.google.common.cache.CacheLoader;
|
||||||
|
import com.google.common.cache.LoadingCache;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
@ -299,6 +305,9 @@ public static InetSocketAddress createSocketAddr(String target) {
|
|||||||
DataNodeMetrics metrics;
|
DataNodeMetrics metrics;
|
||||||
private InetSocketAddress streamingAddr;
|
private InetSocketAddress streamingAddr;
|
||||||
|
|
||||||
|
// See the note below in incrDatanodeNetworkErrors re: concurrency.
|
||||||
|
private LoadingCache<String, Map<String, Long>> datanodeNetworkCounts;
|
||||||
|
|
||||||
private String hostName;
|
private String hostName;
|
||||||
private DatanodeID id;
|
private DatanodeID id;
|
||||||
|
|
||||||
@ -414,6 +423,20 @@ public static InetSocketAddress createSocketAddr(String target) {
|
|||||||
shutdown();
|
shutdown();
|
||||||
throw ie;
|
throw ie;
|
||||||
}
|
}
|
||||||
|
final int dncCacheMaxSize =
|
||||||
|
conf.getInt(DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_KEY,
|
||||||
|
DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_DEFAULT) ;
|
||||||
|
datanodeNetworkCounts =
|
||||||
|
CacheBuilder.newBuilder()
|
||||||
|
.maximumSize(dncCacheMaxSize)
|
||||||
|
.build(new CacheLoader<String, Map<String, Long>>() {
|
||||||
|
@Override
|
||||||
|
public Map<String, Long> load(String key) throws Exception {
|
||||||
|
final Map<String, Long> ret = new HashMap<String, Long>();
|
||||||
|
ret.put("networkErrors", 0L);
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -1768,6 +1791,30 @@ public int getXceiverCount() {
|
|||||||
return threadGroup == null ? 0 : threadGroup.activeCount();
|
return threadGroup == null ? 0 : threadGroup.activeCount();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override // DataNodeMXBean
|
||||||
|
public Map<String, Map<String, Long>> getDatanodeNetworkCounts() {
|
||||||
|
return datanodeNetworkCounts.asMap();
|
||||||
|
}
|
||||||
|
|
||||||
|
void incrDatanodeNetworkErrors(String host) {
|
||||||
|
metrics.incrDatanodeNetworkErrors();
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Synchronizing on the whole cache is a big hammer, but since it's only
|
||||||
|
* accumulating errors, it should be ok. If this is ever expanded to include
|
||||||
|
* non-error stats, then finer-grained concurrency should be applied.
|
||||||
|
*/
|
||||||
|
synchronized (datanodeNetworkCounts) {
|
||||||
|
try {
|
||||||
|
final Map<String, Long> curCount = datanodeNetworkCounts.get(host);
|
||||||
|
curCount.put("networkErrors", curCount.get("networkErrors") + 1L);
|
||||||
|
datanodeNetworkCounts.put(host, curCount);
|
||||||
|
} catch (ExecutionException e) {
|
||||||
|
LOG.warn("failed to increment network error counts for " + host);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
int getXmitsInProgress() {
|
int getXmitsInProgress() {
|
||||||
return xmitsInProgress.get();
|
return xmitsInProgress.get();
|
||||||
}
|
}
|
||||||
|
@ -20,6 +20,8 @@
|
|||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
* This is the JMX management interface for data node information
|
* This is the JMX management interface for data node information
|
||||||
@ -76,4 +78,9 @@ public interface DataNodeMXBean {
|
|||||||
* actively transferring blocks.
|
* actively transferring blocks.
|
||||||
*/
|
*/
|
||||||
public int getXceiverCount();
|
public int getXceiverCount();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets the network error counts on a per-Datanode basis.
|
||||||
|
*/
|
||||||
|
public Map<String, Map<String, Long>> getDatanodeNetworkCounts();
|
||||||
}
|
}
|
||||||
|
@ -97,6 +97,7 @@ class DataXceiver extends Receiver implements Runnable {
|
|||||||
|
|
||||||
private Peer peer;
|
private Peer peer;
|
||||||
private final String remoteAddress; // address of remote side
|
private final String remoteAddress; // address of remote side
|
||||||
|
private final String remoteAddressWithoutPort; // only the address, no port
|
||||||
private final String localAddress; // local address of this daemon
|
private final String localAddress; // local address of this daemon
|
||||||
private final DataNode datanode;
|
private final DataNode datanode;
|
||||||
private final DNConf dnConf;
|
private final DNConf dnConf;
|
||||||
@ -129,6 +130,9 @@ private DataXceiver(Peer peer, DataNode datanode,
|
|||||||
this.dataXceiverServer = dataXceiverServer;
|
this.dataXceiverServer = dataXceiverServer;
|
||||||
this.connectToDnViaHostname = datanode.getDnConf().connectToDnViaHostname;
|
this.connectToDnViaHostname = datanode.getDnConf().connectToDnViaHostname;
|
||||||
remoteAddress = peer.getRemoteAddressString();
|
remoteAddress = peer.getRemoteAddressString();
|
||||||
|
final int colonIdx = remoteAddress.indexOf(':');
|
||||||
|
remoteAddressWithoutPort =
|
||||||
|
(colonIdx < 0) ? remoteAddress : remoteAddress.substring(0, colonIdx);
|
||||||
localAddress = peer.getLocalAddressString();
|
localAddress = peer.getLocalAddressString();
|
||||||
|
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
@ -222,7 +226,7 @@ public void run() {
|
|||||||
LOG.debug("Cached " + peer + " closing after " + opsProcessed + " ops");
|
LOG.debug("Cached " + peer + " closing after " + opsProcessed + " ops");
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
datanode.metrics.incrDatanodeNetworkErrors();
|
incrDatanodeNetworkErrors();
|
||||||
throw err;
|
throw err;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
@ -521,7 +525,7 @@ public void readBlock(final ExtendedBlock block,
|
|||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
LOG.debug("Error reading client status response. Will close connection.", ioe);
|
LOG.debug("Error reading client status response. Will close connection.", ioe);
|
||||||
IOUtils.closeStream(out);
|
IOUtils.closeStream(out);
|
||||||
datanode.metrics.incrDatanodeNetworkErrors();
|
incrDatanodeNetworkErrors();
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
IOUtils.closeStream(out);
|
IOUtils.closeStream(out);
|
||||||
@ -543,7 +547,7 @@ public void readBlock(final ExtendedBlock block,
|
|||||||
if (!(ioe instanceof SocketTimeoutException)) {
|
if (!(ioe instanceof SocketTimeoutException)) {
|
||||||
LOG.warn(dnR + ":Got exception while serving " + block + " to "
|
LOG.warn(dnR + ":Got exception while serving " + block + " to "
|
||||||
+ remoteAddress, ioe);
|
+ remoteAddress, ioe);
|
||||||
datanode.metrics.incrDatanodeNetworkErrors();
|
incrDatanodeNetworkErrors();
|
||||||
}
|
}
|
||||||
throw ioe;
|
throw ioe;
|
||||||
} finally {
|
} finally {
|
||||||
@ -722,7 +726,7 @@ public void writeBlock(final ExtendedBlock block,
|
|||||||
LOG.info(datanode + ":Exception transfering " +
|
LOG.info(datanode + ":Exception transfering " +
|
||||||
block + " to mirror " + mirrorNode +
|
block + " to mirror " + mirrorNode +
|
||||||
"- continuing without the mirror", e);
|
"- continuing without the mirror", e);
|
||||||
datanode.metrics.incrDatanodeNetworkErrors();
|
incrDatanodeNetworkErrors();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -777,7 +781,7 @@ public void writeBlock(final ExtendedBlock block,
|
|||||||
|
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
LOG.info("opWriteBlock " + block + " received exception " + ioe);
|
LOG.info("opWriteBlock " + block + " received exception " + ioe);
|
||||||
datanode.metrics.incrDatanodeNetworkErrors();
|
incrDatanodeNetworkErrors();
|
||||||
throw ioe;
|
throw ioe;
|
||||||
} finally {
|
} finally {
|
||||||
// close all opened streams
|
// close all opened streams
|
||||||
@ -813,7 +817,7 @@ public void transferBlock(final ExtendedBlock blk,
|
|||||||
writeResponse(Status.SUCCESS, null, out);
|
writeResponse(Status.SUCCESS, null, out);
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
LOG.info("transferBlock " + blk + " received exception " + ioe);
|
LOG.info("transferBlock " + blk + " received exception " + ioe);
|
||||||
datanode.metrics.incrDatanodeNetworkErrors();
|
incrDatanodeNetworkErrors();
|
||||||
throw ioe;
|
throw ioe;
|
||||||
} finally {
|
} finally {
|
||||||
IOUtils.closeStream(out);
|
IOUtils.closeStream(out);
|
||||||
@ -908,7 +912,7 @@ public void blockChecksum(final ExtendedBlock block,
|
|||||||
out.flush();
|
out.flush();
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
LOG.info("blockChecksum " + block + " received exception " + ioe);
|
LOG.info("blockChecksum " + block + " received exception " + ioe);
|
||||||
datanode.metrics.incrDatanodeNetworkErrors();
|
incrDatanodeNetworkErrors();
|
||||||
throw ioe;
|
throw ioe;
|
||||||
} finally {
|
} finally {
|
||||||
IOUtils.closeStream(out);
|
IOUtils.closeStream(out);
|
||||||
@ -975,7 +979,7 @@ public void copyBlock(final ExtendedBlock block,
|
|||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
isOpSuccess = false;
|
isOpSuccess = false;
|
||||||
LOG.info("opCopyBlock " + block + " received exception " + ioe);
|
LOG.info("opCopyBlock " + block + " received exception " + ioe);
|
||||||
datanode.metrics.incrDatanodeNetworkErrors();
|
incrDatanodeNetworkErrors();
|
||||||
throw ioe;
|
throw ioe;
|
||||||
} finally {
|
} finally {
|
||||||
dataXceiverServer.balanceThrottler.release();
|
dataXceiverServer.balanceThrottler.release();
|
||||||
@ -1108,7 +1112,7 @@ public void replaceBlock(final ExtendedBlock block,
|
|||||||
LOG.info(errMsg);
|
LOG.info(errMsg);
|
||||||
if (!IoeDuringCopyBlockOperation) {
|
if (!IoeDuringCopyBlockOperation) {
|
||||||
// Don't double count IO errors
|
// Don't double count IO errors
|
||||||
datanode.metrics.incrDatanodeNetworkErrors();
|
incrDatanodeNetworkErrors();
|
||||||
}
|
}
|
||||||
throw ioe;
|
throw ioe;
|
||||||
} finally {
|
} finally {
|
||||||
@ -1128,7 +1132,7 @@ public void replaceBlock(final ExtendedBlock block,
|
|||||||
sendResponse(opStatus, errMsg);
|
sendResponse(opStatus, errMsg);
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
LOG.warn("Error writing reply back to " + peer.getRemoteAddressString());
|
LOG.warn("Error writing reply back to " + peer.getRemoteAddressString());
|
||||||
datanode.metrics.incrDatanodeNetworkErrors();
|
incrDatanodeNetworkErrors();
|
||||||
}
|
}
|
||||||
IOUtils.closeStream(proxyOut);
|
IOUtils.closeStream(proxyOut);
|
||||||
IOUtils.closeStream(blockReceiver);
|
IOUtils.closeStream(blockReceiver);
|
||||||
@ -1182,6 +1186,9 @@ private void writeSuccessWithChecksumInfo(BlockSender blockSender,
|
|||||||
out.flush();
|
out.flush();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void incrDatanodeNetworkErrors() {
|
||||||
|
datanode.incrDatanodeNetworkErrors(remoteAddressWithoutPort);
|
||||||
|
}
|
||||||
|
|
||||||
private void checkAccess(OutputStream out, final boolean reply,
|
private void checkAccess(OutputStream out, final boolean reply,
|
||||||
final ExtendedBlock blk,
|
final ExtendedBlock blk,
|
||||||
|
@ -27,7 +27,9 @@
|
|||||||
|
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.lang.management.ManagementFactory;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
@ -48,6 +50,9 @@
|
|||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.mockito.Mockito;
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
|
import javax.management.MBeanServer;
|
||||||
|
import javax.management.ObjectName;
|
||||||
|
|
||||||
public class TestDataNodeMetrics {
|
public class TestDataNodeMetrics {
|
||||||
private static final Log LOG = LogFactory.getLog(TestDataNodeMetrics.class);
|
private static final Log LOG = LogFactory.getLog(TestDataNodeMetrics.class);
|
||||||
|
|
||||||
@ -217,9 +222,22 @@ public void testTimeoutMetric() throws Exception {
|
|||||||
out.writeBytes("old gs data\n");
|
out.writeBytes("old gs data\n");
|
||||||
out.hflush();
|
out.hflush();
|
||||||
|
|
||||||
|
/* Test the metric. */
|
||||||
final MetricsRecordBuilder dnMetrics =
|
final MetricsRecordBuilder dnMetrics =
|
||||||
getMetrics(cluster.getDataNodes().get(0).getMetrics().name());
|
getMetrics(cluster.getDataNodes().get(0).getMetrics().name());
|
||||||
assertCounter("DatanodeNetworkErrors", 1L, dnMetrics);
|
assertCounter("DatanodeNetworkErrors", 1L, dnMetrics);
|
||||||
|
|
||||||
|
/* Test JMX datanode network counts. */
|
||||||
|
final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
|
||||||
|
final ObjectName mxbeanName =
|
||||||
|
new ObjectName("Hadoop:service=DataNode,name=DataNodeInfo");
|
||||||
|
final Object dnc =
|
||||||
|
mbs.getAttribute(mxbeanName, "DatanodeNetworkCounts");
|
||||||
|
final String allDnc = dnc.toString();
|
||||||
|
assertTrue("expected to see loopback address",
|
||||||
|
allDnc.indexOf("127.0.0.1") >= 0);
|
||||||
|
assertTrue("expected to see networkErrors",
|
||||||
|
allDnc.indexOf("networkErrors") >= 0);
|
||||||
} finally {
|
} finally {
|
||||||
IOUtils.cleanup(LOG, streams.toArray(new Closeable[0]));
|
IOUtils.cleanup(LOG, streams.toArray(new Closeable[0]));
|
||||||
if (cluster != null) {
|
if (cluster != null) {
|
||||||
|
Loading…
Reference in New Issue
Block a user