From b2951f9fbccee8aeab04c1f5ee3fc6db1ef6b2da Mon Sep 17 00:00:00 2001 From: Xiaoyu Yao Date: Thu, 25 Feb 2016 15:46:53 -0800 Subject: [PATCH] HADOOP-12824. Collect network and disk usage on the node running Windows. Contributed by Inigo Goiri. --- .../hadoop-common/CHANGES.txt | 3 + .../apache/hadoop/util/SysInfoWindows.java | 30 +++- .../src/main/winutils/systeminfo.c | 169 +++++++++++++++++- .../hadoop/util/TestSysInfoWindows.java | 19 +- 4 files changed, 204 insertions(+), 17 deletions(-) diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index c91820edcf..e5ce0eea4c 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -1158,6 +1158,9 @@ Release 2.8.0 - UNRELEASED HADOOP-12535. Run FileSystem contract tests with hadoop-azure. (Madhumita Chakraborty via cnauroth) + HADOOP-12824. Collect network and disk usage on the node running Windows. + (Inigo Goiri via xyao) + OPTIMIZATIONS HADOOP-11785. Reduce the number of listStatus operation in distcp diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoWindows.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoWindows.java index b65569b8cd..de0c43bcef 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoWindows.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoWindows.java @@ -44,6 +44,10 @@ public class SysInfoWindows extends SysInfo { private long cpuFrequencyKhz; private long cumulativeCpuTimeMs; private float cpuUsage; + private long storageBytesRead; + private long storageBytesWritten; + private long netBytesRead; + private long netBytesWritten; private long lastRefreshTime; static final int REFRESH_INTERVAL_MS = 1000; @@ -67,6 +71,10 @@ void reset() { cpuFrequencyKhz = -1; cumulativeCpuTimeMs = -1; cpuUsage = -1; + storageBytesRead = -1; + storageBytesWritten = -1; + netBytesRead = -1; + netBytesWritten = -1; } String getSystemInfoInfoFromShell() { @@ -91,7 +99,7 @@ void refreshIfNeeded() { reset(); String sysInfoStr = getSystemInfoInfoFromShell(); if (sysInfoStr != null) { - final int sysInfoSplitCount = 7; + final int sysInfoSplitCount = 11; String[] sysInfo = sysInfoStr.substring(0, sysInfoStr.indexOf("\r\n")) .split(","); if (sysInfo.length == sysInfoSplitCount) { @@ -103,6 +111,10 @@ void refreshIfNeeded() { numProcessors = Integer.parseInt(sysInfo[4]); cpuFrequencyKhz = Long.parseLong(sysInfo[5]); cumulativeCpuTimeMs = Long.parseLong(sysInfo[6]); + storageBytesRead = Long.parseLong(sysInfo[7]); + storageBytesWritten = Long.parseLong(sysInfo[8]); + netBytesRead = Long.parseLong(sysInfo[9]); + netBytesWritten = Long.parseLong(sysInfo[10]); if (lastCumCpuTimeMs != -1) { /** * This number will be the aggregated usage across all cores in @@ -203,27 +215,27 @@ public float getNumVCoresUsed() { /** {@inheritDoc} */ @Override public long getNetworkBytesRead() { - // TODO unimplemented - return 0L; + refreshIfNeeded(); + return netBytesRead; } /** {@inheritDoc} */ @Override public long getNetworkBytesWritten() { - // TODO unimplemented - return 0L; + refreshIfNeeded(); + return netBytesWritten; } @Override public long getStorageBytesRead() { - // TODO unimplemented - return 0L; + refreshIfNeeded(); + return storageBytesRead; } @Override public long getStorageBytesWritten() { - // TODO unimplemented - return 0L; + refreshIfNeeded(); + return storageBytesWritten; } } diff --git a/hadoop-common-project/hadoop-common/src/main/winutils/systeminfo.c b/hadoop-common-project/hadoop-common/src/main/winutils/systeminfo.c index 48f03ed3b6..b7093a7b7d 100644 --- a/hadoop-common-project/hadoop-common/src/main/winutils/systeminfo.c +++ b/hadoop-common-project/hadoop-common/src/main/winutils/systeminfo.c @@ -18,6 +18,8 @@ #include "winutils.h" #include #include +#include +#include #ifdef PSAPI_VERSION #undef PSAPI_VERSION @@ -25,6 +27,12 @@ #define PSAPI_VERSION 1 #pragma comment(lib, "psapi.lib") #pragma comment(lib, "Powrprof.lib") +#pragma comment(lib, "pdh.lib") + +CONST PWSTR COUNTER_PATH_NET_READ_ALL = L"\\Network Interface(*)\\Bytes Received/Sec"; +CONST PWSTR COUNTER_PATH_NET_WRITE_ALL = L"\\Network Interface(*)\\Bytes Sent/Sec"; +CONST PWSTR COUNTER_PATH_DISK_READ_ALL = L"\\LogicalDisk(*)\\Disk Read Bytes/sec"; +CONST PWSTR COUNTER_PATH_DISK_WRITE_ALL = L"\\LogicalDisk(*)\\Disk Write Bytes/sec"; typedef struct _PROCESSOR_POWER_INFORMATION { ULONG Number; @@ -57,6 +65,7 @@ int SystemInfo() PROCESSOR_POWER_INFORMATION const *ppi; ULONGLONG cpuFrequencyKhz; NTSTATUS status; + LONGLONG diskRead, diskWrite, netRead, netWrite; ZeroMemory(&memInfo, sizeof(PERFORMANCE_INFORMATION)); memInfo.cb = sizeof(PERFORMANCE_INFORMATION); @@ -105,8 +114,16 @@ int SystemInfo() cpuFrequencyKhz = ppi->MaxMhz*1000; LocalFree(pBuffer); - fwprintf_s(stdout, L"%Iu,%Iu,%Iu,%Iu,%u,%I64u,%I64u\n", vmemSize, memSize, - vmemFree, memFree, sysInfo.dwNumberOfProcessors, cpuFrequencyKhz, cpuTimeMs); + status = GetDiskAndNetwork(&diskRead, &diskWrite, &netRead, &netWrite); + if(0 != status) + { + fwprintf_s(stderr, L"Error in GetDiskAndNetwork. Err:%d\n", status); + return EXIT_FAILURE; + } + + fwprintf_s(stdout, L"%Iu,%Iu,%Iu,%Iu,%u,%I64u,%I64u,%I64d,%I64d,%I64d,%I64d\n", vmemSize, memSize, + vmemFree, memFree, sysInfo.dwNumberOfProcessors, cpuFrequencyKhz, cpuTimeMs, + diskRead, diskWrite, netRead, netWrite); return EXIT_SUCCESS; } @@ -120,5 +137,151 @@ void SystemInfoUsage() VirtualMemorySize(bytes),PhysicalMemorySize(bytes),\n\ FreeVirtualMemory(bytes),FreePhysicalMemory(bytes),\n\ NumberOfProcessors,CpuFrequency(Khz),\n\ - CpuTime(MilliSec,Kernel+User)\n"); + CpuTime(MilliSec,Kernel+User),\n\ + DiskRead(bytes),DiskWrite(bytes),\n\ + NetworkRead(bytes),NetworkWrite(bytes)\n"); +} + +int GetDiskAndNetwork(LONGLONG* diskRead, LONGLONG* diskWrite, LONGLONG* netRead, LONGLONG* netWrite) +{ + int ret = EXIT_SUCCESS; + PDH_STATUS status = ERROR_SUCCESS; + PDH_HQUERY hQuery = NULL; + PDH_HCOUNTER hCounterNetRead = NULL; + PDH_HCOUNTER hCounterNetWrite = NULL; + PDH_HCOUNTER hCounterDiskRead = NULL; + PDH_HCOUNTER hCounterDiskWrite = NULL; + DWORD i; + + if(status = PdhOpenQuery(NULL, 0, &hQuery)) + { + fwprintf_s(stderr, L"PdhOpenQuery failed with 0x%x.\n", status); + ret = EXIT_FAILURE; + goto cleanup; + } + + // Add each one of the counters with wild cards + if(status = PdhAddCounter(hQuery, COUNTER_PATH_NET_READ_ALL, 0, &hCounterNetRead)) + { + fwprintf_s(stderr, L"PdhAddCounter %s failed with 0x%x.\n", COUNTER_PATH_NET_READ_ALL, status); + ret = EXIT_FAILURE; + goto cleanup; + } + if(status = PdhAddCounter(hQuery, COUNTER_PATH_NET_WRITE_ALL, 0, &hCounterNetWrite)) + { + fwprintf_s(stderr, L"PdhAddCounter %s failed with 0x%x.\n", COUNTER_PATH_NET_WRITE_ALL, status); + ret = EXIT_FAILURE; + goto cleanup; + } + if(status = PdhAddCounter(hQuery, COUNTER_PATH_DISK_READ_ALL, 0, &hCounterDiskRead)) + { + fwprintf_s(stderr, L"PdhAddCounter %s failed with 0x%x.\n", COUNTER_PATH_DISK_READ_ALL, status); + ret = EXIT_FAILURE; + goto cleanup; + } + if(status = PdhAddCounter(hQuery, COUNTER_PATH_DISK_WRITE_ALL, 0, &hCounterDiskWrite)) + { + fwprintf_s(stderr, L"PdhAddCounter %s failed with 0x%x.\n", COUNTER_PATH_DISK_WRITE_ALL, status); + ret = EXIT_FAILURE; + goto cleanup; + } + + if(status = PdhCollectQueryData(hQuery)) + { + fwprintf_s(stderr, L"PdhCollectQueryData() failed with 0x%x.\n", status); + ret = EXIT_FAILURE; + goto cleanup; + } + + // Read and aggregate counters + status = ReadTotalCounter(hCounterNetRead, netRead); + if(ERROR_SUCCESS != status) + { + fwprintf_s(stderr, L"ReadTotalCounter(Network Read): Error 0x%x.\n", status); + ret = EXIT_FAILURE; + } + + status = ReadTotalCounter(hCounterNetWrite, netWrite); + if(ERROR_SUCCESS != status) + { + fwprintf_s(stderr, L"ReadTotalCounter(Network Write): Error 0x%x.\n", status); + ret = EXIT_FAILURE; + } + + status = ReadTotalCounter(hCounterDiskRead, diskRead); + if(ERROR_SUCCESS != status) + { + fwprintf_s(stderr, L"ReadTotalCounter(Disk Read): Error 0x%x.\n", status); + ret = EXIT_FAILURE; + } + + status = ReadTotalCounter(hCounterDiskWrite, diskWrite); + if(ERROR_SUCCESS != status) + { + fwprintf_s(stderr, L"ReadTotalCounter(Disk Write): Error 0x%x.\n", status); + ret = EXIT_FAILURE; + } + +cleanup: + if (hQuery) + { + status = PdhCloseQuery(hQuery); + } + + return ret; +} + +PDH_STATUS ReadTotalCounter(PDH_HCOUNTER hCounter, LONGLONG* ret) +{ + PDH_STATUS status = ERROR_SUCCESS; + DWORD i = 0; + DWORD dwBufferSize = 0; + DWORD dwItemCount = 0; + PDH_RAW_COUNTER_ITEM *pItems = NULL; + + // Initialize output + *ret = 0; + + // Get the required size of the pItems buffer + status = PdhGetRawCounterArray(hCounter, &dwBufferSize, &dwItemCount, NULL); + if (PDH_MORE_DATA == status) + { + pItems = (PDH_RAW_COUNTER_ITEM *) malloc(dwBufferSize); + if (pItems) + { + // Actually query the counter + status = PdhGetRawCounterArray(hCounter, &dwBufferSize, &dwItemCount, pItems); + if (ERROR_SUCCESS == status) { + for (i = 0; i < dwItemCount; i++) { + if (wcscmp(L"_Total", pItems[i].szName) == 0) { + *ret = pItems[i].RawValue.FirstValue; + break; + } else { + *ret += pItems[i].RawValue.FirstValue; + } + } + } else { + *ret = -1; + goto cleanup; + } + // Reset structures + free(pItems); + pItems = NULL; + dwBufferSize = dwItemCount = 0; + } else { + *ret = -1; + status = PDH_NO_DATA; + goto cleanup; + } + } else { + *ret = -1; + goto cleanup; + } + +cleanup: + if (pItems) { + free(pItems); + } + + return status; } \ No newline at end of file diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestSysInfoWindows.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestSysInfoWindows.java index 2544e7cfbd..555157678c 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestSysInfoWindows.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestSysInfoWindows.java @@ -47,7 +47,8 @@ long now() { public void parseSystemInfoString() { SysInfoWindowsMock tester = new SysInfoWindowsMock(); tester.setSysinfoString( - "17177038848,8589467648,15232745472,6400417792,1,2805000,6261812\r\n"); + "17177038848,8589467648,15232745472,6400417792,1,2805000,6261812," + + "1234567,2345678,3456789,4567890\r\n"); // info str derived from windows shell command has \r\n termination assertEquals(17177038848L, tester.getVirtualMemorySize()); assertEquals(8589467648L, tester.getPhysicalMemorySize()); @@ -57,6 +58,10 @@ public void parseSystemInfoString() { assertEquals(1, tester.getNumCores()); assertEquals(2805000L, tester.getCpuFrequency()); assertEquals(6261812L, tester.getCumulativeCpuTime()); + assertEquals(1234567L, tester.getStorageBytesRead()); + assertEquals(2345678L, tester.getStorageBytesWritten()); + assertEquals(3456789L, tester.getNetworkBytesRead()); + assertEquals(4567890L, tester.getNetworkBytesWritten()); // undef on first call assertEquals((float)CpuTimeTracker.UNAVAILABLE, tester.getCpuUsagePercentage(), 0.0); @@ -68,7 +73,8 @@ public void parseSystemInfoString() { public void refreshAndCpuUsage() throws InterruptedException { SysInfoWindowsMock tester = new SysInfoWindowsMock(); tester.setSysinfoString( - "17177038848,8589467648,15232745472,6400417792,1,2805000,6261812\r\n"); + "17177038848,8589467648,15232745472,6400417792,1,2805000,6261812," + + "1234567,2345678,3456789,4567890\r\n"); // info str derived from windows shell command has \r\n termination tester.getAvailablePhysicalMemorySize(); // verify information has been refreshed @@ -79,7 +85,8 @@ public void refreshAndCpuUsage() throws InterruptedException { tester.getNumVCoresUsed(), 0.0); tester.setSysinfoString( - "17177038848,8589467648,15232745472,5400417792,1,2805000,6263012\r\n"); + "17177038848,8589467648,15232745472,5400417792,1,2805000,6263012," + + "1234567,2345678,3456789,4567890\r\n"); tester.getAvailablePhysicalMemorySize(); // verify information has not been refreshed assertEquals(6400417792L, tester.getAvailablePhysicalMemorySize()); @@ -106,12 +113,14 @@ public void refreshAndCpuUsageMulticore() throws InterruptedException { // test with 12 cores SysInfoWindowsMock tester = new SysInfoWindowsMock(); tester.setSysinfoString( - "17177038848,8589467648,15232745472,6400417792,12,2805000,6261812\r\n"); + "17177038848,8589467648,15232745472,6400417792,12,2805000,6261812," + + "1234567,2345678,3456789,4567890\r\n"); // verify information has been refreshed assertEquals(6400417792L, tester.getAvailablePhysicalMemorySize()); tester.setSysinfoString( - "17177038848,8589467648,15232745472,5400417792,12,2805000,6263012\r\n"); + "17177038848,8589467648,15232745472,5400417792,12,2805000,6263012," + + "1234567,2345678,3456789,4567890\r\n"); // verify information has not been refreshed assertEquals(6400417792L, tester.getAvailablePhysicalMemorySize());