HADOOP-12210. Collect network usage on the node. Contributed by Robert Grandl

This commit is contained in:
Chris Douglas 2015-07-06 17:28:20 -07:00
parent 0e602fa3a1
commit 1a0752d85a
7 changed files with 195 additions and 2 deletions

View File

@ -693,6 +693,8 @@ Release 2.8.0 - UNRELEASED
HADOOP-12180. Move ResourceCalculatorPlugin from YARN to Common.
(Chris Douglas via kasha)
HADOOP-12210. Collect network usage on the node (Robert Grandl via cdouglas)
OPTIMIZATIONS
HADOOP-11785. Reduce the number of listStatus operation in distcp

View File

@ -108,4 +108,16 @@ public static SysInfo newInstance() {
*/
public abstract float getCpuUsage();
/**
* Obtain the aggregated number of bytes read over the network.
* @return total number of bytes read.
*/
public abstract long getNetworkBytesRead();
/**
* Obtain the aggregated number of bytes written to the network.
* @return total number of bytes written.
*/
public abstract long getNetworkBytesWritten();
}

View File

@ -83,9 +83,22 @@ public class SysInfoLinux extends SysInfo {
"[ \t]*([0-9]*)[ \t]*([0-9]*)[ \t].*");
private CpuTimeTracker cpuTimeTracker;
/**
* Pattern for parsing /proc/net/dev.
*/
private static final String PROCFS_NETFILE = "/proc/net/dev";
private static final Pattern PROCFS_NETFILE_FORMAT =
Pattern .compile("^[ \t]*([a-zA-Z]+[0-9]*):" +
"[ \t]*([0-9]+)[ \t]*([0-9]+)[ \t]*([0-9]+)[ \t]*([0-9]+)" +
"[ \t]*([0-9]+)[ \t]*([0-9]+)[ \t]*([0-9]+)[ \t]*([0-9]+)" +
"[ \t]*([0-9]+)[ \t]*([0-9]+)[ \t]*([0-9]+)[ \t]*([0-9]+)" +
"[ \t]*([0-9]+)[ \t]*([0-9]+)[ \t]*([0-9]+)[ \t]*([0-9]+).*");
private String procfsMemFile;
private String procfsCpuFile;
private String procfsStatFile;
private String procfsNetFile;
private long jiffyLengthInMillis;
private long ramSize = 0;
@ -98,6 +111,8 @@ public class SysInfoLinux extends SysInfo {
/* number of physical cores on the system. */
private int numCores = 0;
private long cpuFrequency = 0L; // CPU frequency on the system (kHz)
private long numNetBytesRead = 0L; // aggregated bytes read from network
private long numNetBytesWritten = 0L; // aggregated bytes written to network
private boolean readMemInfoFile = false;
private boolean readCpuInfoFile = false;
@ -130,7 +145,7 @@ long getCurrentTime() {
public SysInfoLinux() {
this(PROCFS_MEMFILE, PROCFS_CPUINFO, PROCFS_STAT,
JIFFY_LENGTH_IN_MILLIS);
PROCFS_NETFILE, JIFFY_LENGTH_IN_MILLIS);
}
/**
@ -139,16 +154,19 @@ public SysInfoLinux() {
* @param procfsMemFile fake file for /proc/meminfo
* @param procfsCpuFile fake file for /proc/cpuinfo
* @param procfsStatFile fake file for /proc/stat
* @param procfsNetFile fake file for /proc/net/dev
* @param jiffyLengthInMillis fake jiffy length value
*/
@VisibleForTesting
public SysInfoLinux(String procfsMemFile,
String procfsCpuFile,
String procfsStatFile,
String procfsNetFile,
long jiffyLengthInMillis) {
this.procfsMemFile = procfsMemFile;
this.procfsCpuFile = procfsCpuFile;
this.procfsStatFile = procfsStatFile;
this.procfsNetFile = procfsNetFile;
this.jiffyLengthInMillis = jiffyLengthInMillis;
this.cpuTimeTracker = new CpuTimeTracker(jiffyLengthInMillis);
}
@ -338,6 +356,61 @@ private void readProcStatFile() {
}
}
/**
* Read /proc/net/dev file, parse and calculate amount
* of bytes read and written through the network.
*/
private void readProcNetInfoFile() {
numNetBytesRead = 0L;
numNetBytesWritten = 0L;
// Read "/proc/net/dev" file
BufferedReader in;
InputStreamReader fReader;
try {
fReader = new InputStreamReader(
new FileInputStream(procfsNetFile), Charset.forName("UTF-8"));
in = new BufferedReader(fReader);
} catch (FileNotFoundException f) {
return;
}
Matcher mat;
try {
String str = in.readLine();
while (str != null) {
mat = PROCFS_NETFILE_FORMAT.matcher(str);
if (mat.find()) {
assert mat.groupCount() >= 16;
// ignore loopback interfaces
if (mat.group(1).equals("lo")) {
str = in.readLine();
continue;
}
numNetBytesRead += Long.parseLong(mat.group(2));
numNetBytesWritten += Long.parseLong(mat.group(10));
}
str = in.readLine();
}
} catch (IOException io) {
LOG.warn("Error reading the stream " + io);
} finally {
// Close the streams
try {
fReader.close();
try {
in.close();
} catch (IOException i) {
LOG.warn("Error closing the stream " + in);
}
} catch (IOException i) {
LOG.warn("Error closing the stream " + fReader);
}
}
}
/** {@inheritDoc} */
@Override
public long getPhysicalMemorySize() {
@ -405,6 +478,20 @@ public float getCpuUsage() {
return overallCpuUsage;
}
/** {@inheritDoc} */
@Override
public long getNetworkBytesRead() {
readProcNetInfoFile();
return numNetBytesRead;
}
/** {@inheritDoc} */
@Override
public long getNetworkBytesWritten() {
readProcNetInfoFile();
return numNetBytesWritten;
}
/**
* Test the {@link SysInfoLinux}.
*
@ -424,6 +511,10 @@ public static void main(String[] args) {
System.out.println("CPU frequency (kHz) : " + plugin.getCpuFrequency());
System.out.println("Cumulative CPU time (ms) : " +
plugin.getCumulativeCpuTime());
System.out.println("Total network read (bytes) : "
+ plugin.getNetworkBytesRead());
System.out.println("Total network written (bytes) : "
+ plugin.getNetworkBytesWritten());
try {
// Sleep so we can compute the CPU usage
Thread.sleep(500L);

View File

@ -178,4 +178,19 @@ public float getCpuUsage() {
refreshIfNeeded();
return cpuUsage;
}
/** {@inheritDoc} */
@Override
public long getNetworkBytesRead() {
// TODO unimplemented
return 0L;
}
/** {@inheritDoc} */
@Override
public long getNetworkBytesWritten() {
// TODO unimplemented
return 0L;
}
}

View File

@ -44,8 +44,10 @@ static class FakeLinuxResourceCalculatorPlugin extends
public FakeLinuxResourceCalculatorPlugin(String procfsMemFile,
String procfsCpuFile,
String procfsStatFile,
String procfsNetFile,
long jiffyLengthInMillis) {
super(procfsMemFile, procfsCpuFile, procfsStatFile, jiffyLengthInMillis);
super(procfsMemFile, procfsCpuFile, procfsStatFile, procfsNetFile,
jiffyLengthInMillis);
}
@Override
long getCurrentTime() {
@ -61,14 +63,17 @@ public void advanceTime(long adv) {
private static final String FAKE_MEMFILE;
private static final String FAKE_CPUFILE;
private static final String FAKE_STATFILE;
private static final String FAKE_NETFILE;
private static final long FAKE_JIFFY_LENGTH = 10L;
static {
int randomNum = (new Random()).nextInt(1000000000);
FAKE_MEMFILE = TEST_ROOT_DIR + File.separator + "MEMINFO_" + randomNum;
FAKE_CPUFILE = TEST_ROOT_DIR + File.separator + "CPUINFO_" + randomNum;
FAKE_STATFILE = TEST_ROOT_DIR + File.separator + "STATINFO_" + randomNum;
FAKE_NETFILE = TEST_ROOT_DIR + File.separator + "NETINFO_" + randomNum;
plugin = new FakeLinuxResourceCalculatorPlugin(FAKE_MEMFILE, FAKE_CPUFILE,
FAKE_STATFILE,
FAKE_NETFILE,
FAKE_JIFFY_LENGTH);
}
static final String MEMINFO_FORMAT =
@ -141,6 +146,17 @@ public void advanceTime(long adv) {
"procs_running 1\n" +
"procs_blocked 0\n";
static final String NETINFO_FORMAT =
"Inter-| Receive | Transmit\n"+
"face |bytes packets errs drop fifo frame compressed multicast|bytes packets"+
"errs drop fifo colls carrier compressed\n"+
" lo: 42236310 563003 0 0 0 0 0 0 42236310 563003 " +
"0 0 0 0 0 0\n"+
" eth0: %d 3452527 0 0 0 0 0 299787 %d 1866280 0 0 " +
"0 0 0 0\n"+
" eth1: %d 3152521 0 0 0 0 0 219781 %d 1866290 0 0 " +
"0 0 0 0\n";
/**
* Test parsing /proc/stat and /proc/cpuinfo
* @throws IOException
@ -320,4 +336,26 @@ private void writeFakeCPUInfoFile(String content) throws IOException {
IOUtils.closeQuietly(fWriter);
}
}
/**
* Test parsing /proc/net/dev
* @throws IOException
*/
@Test
public void parsingProcNetFile() throws IOException {
long numBytesReadIntf1 = 2097172468L;
long numBytesWrittenIntf1 = 1355620114L;
long numBytesReadIntf2 = 1097172460L;
long numBytesWrittenIntf2 = 1055620110L;
File tempFile = new File(FAKE_NETFILE);
tempFile.deleteOnExit();
FileWriter fWriter = new FileWriter(FAKE_NETFILE);
fWriter.write(String.format(NETINFO_FORMAT,
numBytesReadIntf1, numBytesWrittenIntf1,
numBytesReadIntf2, numBytesWrittenIntf2));
fWriter.close();
assertEquals(plugin.getNetworkBytesRead(), numBytesReadIntf1 + numBytesReadIntf2);
assertEquals(plugin.getNetworkBytesWritten(), numBytesWrittenIntf1 + numBytesWrittenIntf2);
}
}

View File

@ -48,6 +48,12 @@ public class DummyResourceCalculatorPlugin extends ResourceCalculatorPlugin {
"mapred.tasktracker.cumulativecputime.testing";
/** CPU usage percentage for testing */
public static final String CPU_USAGE = "mapred.tasktracker.cpuusage.testing";
/** cumulative number of bytes read over the network */
public static final String NETWORK_BYTES_READ =
"mapred.tasktracker.networkread.testing";
/** cumulative number of bytes written over the network */
public static final String NETWORK_BYTES_WRITTEN =
"mapred.tasktracker.networkwritten.testing";
/** process cumulative CPU usage time for testing */
public static final String PROC_CUMULATIVE_CPU_TIME =
"mapred.tasktracker.proccumulativecputime.testing";
@ -111,4 +117,17 @@ public long getCumulativeCpuTime() {
public float getCpuUsage() {
return getConf().getFloat(CPU_USAGE, -1);
}
/** {@inheritDoc} */
@Override
public long getNetworkBytesRead() {
return getConf().getLong(NETWORK_BYTES_READ, -1);
}
/** {@inheritDoc} */
@Override
public long getNetworkBytesWritten() {
return getConf().getLong(NETWORK_BYTES_WRITTEN, -1);
}
}

View File

@ -124,6 +124,22 @@ public float getCpuUsage() {
return sys.getCpuUsage();
}
/**
* Obtain the aggregated number of bytes read over the network.
* @return total number of bytes read.
*/
public long getNetworkBytesRead() {
return sys.getNetworkBytesRead();
}
/**
* Obtain the aggregated number of bytes written to the network.
* @return total number of bytes written.
*/
public long getNetworkBytesWritten() {
return sys.getNetworkBytesWritten();
}
/**
* Create the ResourceCalculatorPlugin from the class name and configure it. If
* class name is null, this method will try and return a memory calculator