From 1a0752d85a15499d120b4a79af9bd740fcd1f8e0 Mon Sep 17 00:00:00 2001 From: Chris Douglas Date: Mon, 6 Jul 2015 17:28:20 -0700 Subject: [PATCH] HADOOP-12210. Collect network usage on the node. Contributed by Robert Grandl --- .../hadoop-common/CHANGES.txt | 2 + .../java/org/apache/hadoop/util/SysInfo.java | 12 +++ .../org/apache/hadoop/util/SysInfoLinux.java | 93 ++++++++++++++++++- .../apache/hadoop/util/SysInfoWindows.java | 15 +++ .../apache/hadoop/util/TestSysInfoLinux.java | 40 +++++++- .../DummyResourceCalculatorPlugin.java | 19 ++++ .../yarn/util/ResourceCalculatorPlugin.java | 16 ++++ 7 files changed, 195 insertions(+), 2 deletions(-) diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index d9a9ebac8d..3d4f1e4c79 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -693,6 +693,8 @@ Release 2.8.0 - UNRELEASED HADOOP-12180. Move ResourceCalculatorPlugin from YARN to Common. (Chris Douglas via kasha) + HADOOP-12210. Collect network usage on the node (Robert Grandl via cdouglas) + OPTIMIZATIONS HADOOP-11785. Reduce the number of listStatus operation in distcp diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfo.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfo.java index ec7fb24012..24b339d1e0 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfo.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfo.java @@ -108,4 +108,16 @@ public static SysInfo newInstance() { */ public abstract float getCpuUsage(); + /** + * Obtain the aggregated number of bytes read over the network. + * @return total number of bytes read. + */ + public abstract long getNetworkBytesRead(); + + /** + * Obtain the aggregated number of bytes written to the network. + * @return total number of bytes written. + */ + public abstract long getNetworkBytesWritten(); + } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoLinux.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoLinux.java index 055298db18..880198546a 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoLinux.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoLinux.java @@ -83,9 +83,22 @@ public class SysInfoLinux extends SysInfo { "[ \t]*([0-9]*)[ \t]*([0-9]*)[ \t].*"); private CpuTimeTracker cpuTimeTracker; + /** + * Pattern for parsing /proc/net/dev. + */ + private static final String PROCFS_NETFILE = "/proc/net/dev"; + private static final Pattern PROCFS_NETFILE_FORMAT = + Pattern .compile("^[ \t]*([a-zA-Z]+[0-9]*):" + + "[ \t]*([0-9]+)[ \t]*([0-9]+)[ \t]*([0-9]+)[ \t]*([0-9]+)" + + "[ \t]*([0-9]+)[ \t]*([0-9]+)[ \t]*([0-9]+)[ \t]*([0-9]+)" + + "[ \t]*([0-9]+)[ \t]*([0-9]+)[ \t]*([0-9]+)[ \t]*([0-9]+)" + + "[ \t]*([0-9]+)[ \t]*([0-9]+)[ \t]*([0-9]+)[ \t]*([0-9]+).*"); + + private String procfsMemFile; private String procfsCpuFile; private String procfsStatFile; + private String procfsNetFile; private long jiffyLengthInMillis; private long ramSize = 0; @@ -98,6 +111,8 @@ public class SysInfoLinux extends SysInfo { /* number of physical cores on the system. */ private int numCores = 0; private long cpuFrequency = 0L; // CPU frequency on the system (kHz) + private long numNetBytesRead = 0L; // aggregated bytes read from network + private long numNetBytesWritten = 0L; // aggregated bytes written to network private boolean readMemInfoFile = false; private boolean readCpuInfoFile = false; @@ -130,7 +145,7 @@ long getCurrentTime() { public SysInfoLinux() { this(PROCFS_MEMFILE, PROCFS_CPUINFO, PROCFS_STAT, - JIFFY_LENGTH_IN_MILLIS); + PROCFS_NETFILE, JIFFY_LENGTH_IN_MILLIS); } /** @@ -139,16 +154,19 @@ public SysInfoLinux() { * @param procfsMemFile fake file for /proc/meminfo * @param procfsCpuFile fake file for /proc/cpuinfo * @param procfsStatFile fake file for /proc/stat + * @param procfsNetFile fake file for /proc/net/dev * @param jiffyLengthInMillis fake jiffy length value */ @VisibleForTesting public SysInfoLinux(String procfsMemFile, String procfsCpuFile, String procfsStatFile, + String procfsNetFile, long jiffyLengthInMillis) { this.procfsMemFile = procfsMemFile; this.procfsCpuFile = procfsCpuFile; this.procfsStatFile = procfsStatFile; + this.procfsNetFile = procfsNetFile; this.jiffyLengthInMillis = jiffyLengthInMillis; this.cpuTimeTracker = new CpuTimeTracker(jiffyLengthInMillis); } @@ -338,6 +356,61 @@ private void readProcStatFile() { } } + /** + * Read /proc/net/dev file, parse and calculate amount + * of bytes read and written through the network. + */ + private void readProcNetInfoFile() { + + numNetBytesRead = 0L; + numNetBytesWritten = 0L; + + // Read "/proc/net/dev" file + BufferedReader in; + InputStreamReader fReader; + try { + fReader = new InputStreamReader( + new FileInputStream(procfsNetFile), Charset.forName("UTF-8")); + in = new BufferedReader(fReader); + } catch (FileNotFoundException f) { + return; + } + + Matcher mat; + try { + String str = in.readLine(); + while (str != null) { + mat = PROCFS_NETFILE_FORMAT.matcher(str); + if (mat.find()) { + assert mat.groupCount() >= 16; + + // ignore loopback interfaces + if (mat.group(1).equals("lo")) { + str = in.readLine(); + continue; + } + numNetBytesRead += Long.parseLong(mat.group(2)); + numNetBytesWritten += Long.parseLong(mat.group(10)); + } + str = in.readLine(); + } + } catch (IOException io) { + LOG.warn("Error reading the stream " + io); + } finally { + // Close the streams + try { + fReader.close(); + try { + in.close(); + } catch (IOException i) { + LOG.warn("Error closing the stream " + in); + } + } catch (IOException i) { + LOG.warn("Error closing the stream " + fReader); + } + } + } + /** {@inheritDoc} */ @Override public long getPhysicalMemorySize() { @@ -405,6 +478,20 @@ public float getCpuUsage() { return overallCpuUsage; } + /** {@inheritDoc} */ + @Override + public long getNetworkBytesRead() { + readProcNetInfoFile(); + return numNetBytesRead; + } + + /** {@inheritDoc} */ + @Override + public long getNetworkBytesWritten() { + readProcNetInfoFile(); + return numNetBytesWritten; + } + /** * Test the {@link SysInfoLinux}. * @@ -424,6 +511,10 @@ public static void main(String[] args) { System.out.println("CPU frequency (kHz) : " + plugin.getCpuFrequency()); System.out.println("Cumulative CPU time (ms) : " + plugin.getCumulativeCpuTime()); + System.out.println("Total network read (bytes) : " + + plugin.getNetworkBytesRead()); + System.out.println("Total network written (bytes) : " + + plugin.getNetworkBytesWritten()); try { // Sleep so we can compute the CPU usage Thread.sleep(500L); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoWindows.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoWindows.java index da4c1c5e87..f8542a30d7 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoWindows.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoWindows.java @@ -178,4 +178,19 @@ public float getCpuUsage() { refreshIfNeeded(); return cpuUsage; } + + /** {@inheritDoc} */ + @Override + public long getNetworkBytesRead() { + // TODO unimplemented + return 0L; + } + + /** {@inheritDoc} */ + @Override + public long getNetworkBytesWritten() { + // TODO unimplemented + return 0L; + } + } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestSysInfoLinux.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestSysInfoLinux.java index 73edc77412..2a31f315bb 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestSysInfoLinux.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestSysInfoLinux.java @@ -44,8 +44,10 @@ static class FakeLinuxResourceCalculatorPlugin extends public FakeLinuxResourceCalculatorPlugin(String procfsMemFile, String procfsCpuFile, String procfsStatFile, + String procfsNetFile, long jiffyLengthInMillis) { - super(procfsMemFile, procfsCpuFile, procfsStatFile, jiffyLengthInMillis); + super(procfsMemFile, procfsCpuFile, procfsStatFile, procfsNetFile, + jiffyLengthInMillis); } @Override long getCurrentTime() { @@ -61,14 +63,17 @@ public void advanceTime(long adv) { private static final String FAKE_MEMFILE; private static final String FAKE_CPUFILE; private static final String FAKE_STATFILE; + private static final String FAKE_NETFILE; private static final long FAKE_JIFFY_LENGTH = 10L; static { int randomNum = (new Random()).nextInt(1000000000); FAKE_MEMFILE = TEST_ROOT_DIR + File.separator + "MEMINFO_" + randomNum; FAKE_CPUFILE = TEST_ROOT_DIR + File.separator + "CPUINFO_" + randomNum; FAKE_STATFILE = TEST_ROOT_DIR + File.separator + "STATINFO_" + randomNum; + FAKE_NETFILE = TEST_ROOT_DIR + File.separator + "NETINFO_" + randomNum; plugin = new FakeLinuxResourceCalculatorPlugin(FAKE_MEMFILE, FAKE_CPUFILE, FAKE_STATFILE, + FAKE_NETFILE, FAKE_JIFFY_LENGTH); } static final String MEMINFO_FORMAT = @@ -141,6 +146,17 @@ public void advanceTime(long adv) { "procs_running 1\n" + "procs_blocked 0\n"; + static final String NETINFO_FORMAT = + "Inter-| Receive | Transmit\n"+ + "face |bytes packets errs drop fifo frame compressed multicast|bytes packets"+ + "errs drop fifo colls carrier compressed\n"+ + " lo: 42236310 563003 0 0 0 0 0 0 42236310 563003 " + + "0 0 0 0 0 0\n"+ + " eth0: %d 3452527 0 0 0 0 0 299787 %d 1866280 0 0 " + + "0 0 0 0\n"+ + " eth1: %d 3152521 0 0 0 0 0 219781 %d 1866290 0 0 " + + "0 0 0 0\n"; + /** * Test parsing /proc/stat and /proc/cpuinfo * @throws IOException @@ -320,4 +336,26 @@ private void writeFakeCPUInfoFile(String content) throws IOException { IOUtils.closeQuietly(fWriter); } } + + /** + * Test parsing /proc/net/dev + * @throws IOException + */ + @Test + public void parsingProcNetFile() throws IOException { + long numBytesReadIntf1 = 2097172468L; + long numBytesWrittenIntf1 = 1355620114L; + long numBytesReadIntf2 = 1097172460L; + long numBytesWrittenIntf2 = 1055620110L; + File tempFile = new File(FAKE_NETFILE); + tempFile.deleteOnExit(); + FileWriter fWriter = new FileWriter(FAKE_NETFILE); + fWriter.write(String.format(NETINFO_FORMAT, + numBytesReadIntf1, numBytesWrittenIntf1, + numBytesReadIntf2, numBytesWrittenIntf2)); + fWriter.close(); + assertEquals(plugin.getNetworkBytesRead(), numBytesReadIntf1 + numBytesReadIntf2); + assertEquals(plugin.getNetworkBytesWritten(), numBytesWrittenIntf1 + numBytesWrittenIntf2); + } + } diff --git a/hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/DummyResourceCalculatorPlugin.java b/hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/DummyResourceCalculatorPlugin.java index fd4cb8302a..b86303b1d3 100644 --- a/hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/DummyResourceCalculatorPlugin.java +++ b/hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/DummyResourceCalculatorPlugin.java @@ -48,6 +48,12 @@ public class DummyResourceCalculatorPlugin extends ResourceCalculatorPlugin { "mapred.tasktracker.cumulativecputime.testing"; /** CPU usage percentage for testing */ public static final String CPU_USAGE = "mapred.tasktracker.cpuusage.testing"; + /** cumulative number of bytes read over the network */ + public static final String NETWORK_BYTES_READ = + "mapred.tasktracker.networkread.testing"; + /** cumulative number of bytes written over the network */ + public static final String NETWORK_BYTES_WRITTEN = + "mapred.tasktracker.networkwritten.testing"; /** process cumulative CPU usage time for testing */ public static final String PROC_CUMULATIVE_CPU_TIME = "mapred.tasktracker.proccumulativecputime.testing"; @@ -111,4 +117,17 @@ public long getCumulativeCpuTime() { public float getCpuUsage() { return getConf().getFloat(CPU_USAGE, -1); } + + /** {@inheritDoc} */ + @Override + public long getNetworkBytesRead() { + return getConf().getLong(NETWORK_BYTES_READ, -1); + } + + /** {@inheritDoc} */ + @Override + public long getNetworkBytesWritten() { + return getConf().getLong(NETWORK_BYTES_WRITTEN, -1); + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ResourceCalculatorPlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ResourceCalculatorPlugin.java index 5e5f1b4903..21724a9f49 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ResourceCalculatorPlugin.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ResourceCalculatorPlugin.java @@ -124,6 +124,22 @@ public float getCpuUsage() { return sys.getCpuUsage(); } + /** + * Obtain the aggregated number of bytes read over the network. + * @return total number of bytes read. + */ + public long getNetworkBytesRead() { + return sys.getNetworkBytesRead(); + } + + /** + * Obtain the aggregated number of bytes written to the network. + * @return total number of bytes written. + */ + public long getNetworkBytesWritten() { + return sys.getNetworkBytesWritten(); + } + /** * Create the ResourceCalculatorPlugin from the class name and configure it. If * class name is null, this method will try and return a memory calculator