HADOOP-12824. Collect network and disk usage on the node running Windows. Contributed by Inigo Goiri.
This commit is contained in:
parent
c4d4df8de0
commit
b2951f9fbc
@ -1158,6 +1158,9 @@ Release 2.8.0 - UNRELEASED
|
||||
HADOOP-12535. Run FileSystem contract tests with hadoop-azure.
|
||||
(Madhumita Chakraborty via cnauroth)
|
||||
|
||||
HADOOP-12824. Collect network and disk usage on the node running Windows.
|
||||
(Inigo Goiri via xyao)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HADOOP-11785. Reduce the number of listStatus operation in distcp
|
||||
|
@ -44,6 +44,10 @@ public class SysInfoWindows extends SysInfo {
|
||||
private long cpuFrequencyKhz;
|
||||
private long cumulativeCpuTimeMs;
|
||||
private float cpuUsage;
|
||||
private long storageBytesRead;
|
||||
private long storageBytesWritten;
|
||||
private long netBytesRead;
|
||||
private long netBytesWritten;
|
||||
|
||||
private long lastRefreshTime;
|
||||
static final int REFRESH_INTERVAL_MS = 1000;
|
||||
@ -67,6 +71,10 @@ void reset() {
|
||||
cpuFrequencyKhz = -1;
|
||||
cumulativeCpuTimeMs = -1;
|
||||
cpuUsage = -1;
|
||||
storageBytesRead = -1;
|
||||
storageBytesWritten = -1;
|
||||
netBytesRead = -1;
|
||||
netBytesWritten = -1;
|
||||
}
|
||||
|
||||
String getSystemInfoInfoFromShell() {
|
||||
@ -91,7 +99,7 @@ void refreshIfNeeded() {
|
||||
reset();
|
||||
String sysInfoStr = getSystemInfoInfoFromShell();
|
||||
if (sysInfoStr != null) {
|
||||
final int sysInfoSplitCount = 7;
|
||||
final int sysInfoSplitCount = 11;
|
||||
String[] sysInfo = sysInfoStr.substring(0, sysInfoStr.indexOf("\r\n"))
|
||||
.split(",");
|
||||
if (sysInfo.length == sysInfoSplitCount) {
|
||||
@ -103,6 +111,10 @@ void refreshIfNeeded() {
|
||||
numProcessors = Integer.parseInt(sysInfo[4]);
|
||||
cpuFrequencyKhz = Long.parseLong(sysInfo[5]);
|
||||
cumulativeCpuTimeMs = Long.parseLong(sysInfo[6]);
|
||||
storageBytesRead = Long.parseLong(sysInfo[7]);
|
||||
storageBytesWritten = Long.parseLong(sysInfo[8]);
|
||||
netBytesRead = Long.parseLong(sysInfo[9]);
|
||||
netBytesWritten = Long.parseLong(sysInfo[10]);
|
||||
if (lastCumCpuTimeMs != -1) {
|
||||
/**
|
||||
* This number will be the aggregated usage across all cores in
|
||||
@ -203,27 +215,27 @@ public float getNumVCoresUsed() {
|
||||
/** {@inheritDoc} */
|
||||
@Override
|
||||
public long getNetworkBytesRead() {
|
||||
// TODO unimplemented
|
||||
return 0L;
|
||||
refreshIfNeeded();
|
||||
return netBytesRead;
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
@Override
|
||||
public long getNetworkBytesWritten() {
|
||||
// TODO unimplemented
|
||||
return 0L;
|
||||
refreshIfNeeded();
|
||||
return netBytesWritten;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getStorageBytesRead() {
|
||||
// TODO unimplemented
|
||||
return 0L;
|
||||
refreshIfNeeded();
|
||||
return storageBytesRead;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getStorageBytesWritten() {
|
||||
// TODO unimplemented
|
||||
return 0L;
|
||||
refreshIfNeeded();
|
||||
return storageBytesWritten;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -18,6 +18,8 @@
|
||||
#include "winutils.h"
|
||||
#include <psapi.h>
|
||||
#include <PowrProf.h>
|
||||
#include <pdh.h>
|
||||
#include <pdhmsg.h>
|
||||
|
||||
#ifdef PSAPI_VERSION
|
||||
#undef PSAPI_VERSION
|
||||
@ -25,6 +27,12 @@
|
||||
#define PSAPI_VERSION 1
|
||||
#pragma comment(lib, "psapi.lib")
|
||||
#pragma comment(lib, "Powrprof.lib")
|
||||
#pragma comment(lib, "pdh.lib")
|
||||
|
||||
CONST PWSTR COUNTER_PATH_NET_READ_ALL = L"\\Network Interface(*)\\Bytes Received/Sec";
|
||||
CONST PWSTR COUNTER_PATH_NET_WRITE_ALL = L"\\Network Interface(*)\\Bytes Sent/Sec";
|
||||
CONST PWSTR COUNTER_PATH_DISK_READ_ALL = L"\\LogicalDisk(*)\\Disk Read Bytes/sec";
|
||||
CONST PWSTR COUNTER_PATH_DISK_WRITE_ALL = L"\\LogicalDisk(*)\\Disk Write Bytes/sec";
|
||||
|
||||
typedef struct _PROCESSOR_POWER_INFORMATION {
|
||||
ULONG Number;
|
||||
@ -57,6 +65,7 @@ int SystemInfo()
|
||||
PROCESSOR_POWER_INFORMATION const *ppi;
|
||||
ULONGLONG cpuFrequencyKhz;
|
||||
NTSTATUS status;
|
||||
LONGLONG diskRead, diskWrite, netRead, netWrite;
|
||||
|
||||
ZeroMemory(&memInfo, sizeof(PERFORMANCE_INFORMATION));
|
||||
memInfo.cb = sizeof(PERFORMANCE_INFORMATION);
|
||||
@ -105,8 +114,16 @@ int SystemInfo()
|
||||
cpuFrequencyKhz = ppi->MaxMhz*1000;
|
||||
LocalFree(pBuffer);
|
||||
|
||||
fwprintf_s(stdout, L"%Iu,%Iu,%Iu,%Iu,%u,%I64u,%I64u\n", vmemSize, memSize,
|
||||
vmemFree, memFree, sysInfo.dwNumberOfProcessors, cpuFrequencyKhz, cpuTimeMs);
|
||||
status = GetDiskAndNetwork(&diskRead, &diskWrite, &netRead, &netWrite);
|
||||
if(0 != status)
|
||||
{
|
||||
fwprintf_s(stderr, L"Error in GetDiskAndNetwork. Err:%d\n", status);
|
||||
return EXIT_FAILURE;
|
||||
}
|
||||
|
||||
fwprintf_s(stdout, L"%Iu,%Iu,%Iu,%Iu,%u,%I64u,%I64u,%I64d,%I64d,%I64d,%I64d\n", vmemSize, memSize,
|
||||
vmemFree, memFree, sysInfo.dwNumberOfProcessors, cpuFrequencyKhz, cpuTimeMs,
|
||||
diskRead, diskWrite, netRead, netWrite);
|
||||
|
||||
return EXIT_SUCCESS;
|
||||
}
|
||||
@ -120,5 +137,151 @@ void SystemInfoUsage()
|
||||
VirtualMemorySize(bytes),PhysicalMemorySize(bytes),\n\
|
||||
FreeVirtualMemory(bytes),FreePhysicalMemory(bytes),\n\
|
||||
NumberOfProcessors,CpuFrequency(Khz),\n\
|
||||
CpuTime(MilliSec,Kernel+User)\n");
|
||||
CpuTime(MilliSec,Kernel+User),\n\
|
||||
DiskRead(bytes),DiskWrite(bytes),\n\
|
||||
NetworkRead(bytes),NetworkWrite(bytes)\n");
|
||||
}
|
||||
|
||||
int GetDiskAndNetwork(LONGLONG* diskRead, LONGLONG* diskWrite, LONGLONG* netRead, LONGLONG* netWrite)
|
||||
{
|
||||
int ret = EXIT_SUCCESS;
|
||||
PDH_STATUS status = ERROR_SUCCESS;
|
||||
PDH_HQUERY hQuery = NULL;
|
||||
PDH_HCOUNTER hCounterNetRead = NULL;
|
||||
PDH_HCOUNTER hCounterNetWrite = NULL;
|
||||
PDH_HCOUNTER hCounterDiskRead = NULL;
|
||||
PDH_HCOUNTER hCounterDiskWrite = NULL;
|
||||
DWORD i;
|
||||
|
||||
if(status = PdhOpenQuery(NULL, 0, &hQuery))
|
||||
{
|
||||
fwprintf_s(stderr, L"PdhOpenQuery failed with 0x%x.\n", status);
|
||||
ret = EXIT_FAILURE;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
// Add each one of the counters with wild cards
|
||||
if(status = PdhAddCounter(hQuery, COUNTER_PATH_NET_READ_ALL, 0, &hCounterNetRead))
|
||||
{
|
||||
fwprintf_s(stderr, L"PdhAddCounter %s failed with 0x%x.\n", COUNTER_PATH_NET_READ_ALL, status);
|
||||
ret = EXIT_FAILURE;
|
||||
goto cleanup;
|
||||
}
|
||||
if(status = PdhAddCounter(hQuery, COUNTER_PATH_NET_WRITE_ALL, 0, &hCounterNetWrite))
|
||||
{
|
||||
fwprintf_s(stderr, L"PdhAddCounter %s failed with 0x%x.\n", COUNTER_PATH_NET_WRITE_ALL, status);
|
||||
ret = EXIT_FAILURE;
|
||||
goto cleanup;
|
||||
}
|
||||
if(status = PdhAddCounter(hQuery, COUNTER_PATH_DISK_READ_ALL, 0, &hCounterDiskRead))
|
||||
{
|
||||
fwprintf_s(stderr, L"PdhAddCounter %s failed with 0x%x.\n", COUNTER_PATH_DISK_READ_ALL, status);
|
||||
ret = EXIT_FAILURE;
|
||||
goto cleanup;
|
||||
}
|
||||
if(status = PdhAddCounter(hQuery, COUNTER_PATH_DISK_WRITE_ALL, 0, &hCounterDiskWrite))
|
||||
{
|
||||
fwprintf_s(stderr, L"PdhAddCounter %s failed with 0x%x.\n", COUNTER_PATH_DISK_WRITE_ALL, status);
|
||||
ret = EXIT_FAILURE;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
if(status = PdhCollectQueryData(hQuery))
|
||||
{
|
||||
fwprintf_s(stderr, L"PdhCollectQueryData() failed with 0x%x.\n", status);
|
||||
ret = EXIT_FAILURE;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
// Read and aggregate counters
|
||||
status = ReadTotalCounter(hCounterNetRead, netRead);
|
||||
if(ERROR_SUCCESS != status)
|
||||
{
|
||||
fwprintf_s(stderr, L"ReadTotalCounter(Network Read): Error 0x%x.\n", status);
|
||||
ret = EXIT_FAILURE;
|
||||
}
|
||||
|
||||
status = ReadTotalCounter(hCounterNetWrite, netWrite);
|
||||
if(ERROR_SUCCESS != status)
|
||||
{
|
||||
fwprintf_s(stderr, L"ReadTotalCounter(Network Write): Error 0x%x.\n", status);
|
||||
ret = EXIT_FAILURE;
|
||||
}
|
||||
|
||||
status = ReadTotalCounter(hCounterDiskRead, diskRead);
|
||||
if(ERROR_SUCCESS != status)
|
||||
{
|
||||
fwprintf_s(stderr, L"ReadTotalCounter(Disk Read): Error 0x%x.\n", status);
|
||||
ret = EXIT_FAILURE;
|
||||
}
|
||||
|
||||
status = ReadTotalCounter(hCounterDiskWrite, diskWrite);
|
||||
if(ERROR_SUCCESS != status)
|
||||
{
|
||||
fwprintf_s(stderr, L"ReadTotalCounter(Disk Write): Error 0x%x.\n", status);
|
||||
ret = EXIT_FAILURE;
|
||||
}
|
||||
|
||||
cleanup:
|
||||
if (hQuery)
|
||||
{
|
||||
status = PdhCloseQuery(hQuery);
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
PDH_STATUS ReadTotalCounter(PDH_HCOUNTER hCounter, LONGLONG* ret)
|
||||
{
|
||||
PDH_STATUS status = ERROR_SUCCESS;
|
||||
DWORD i = 0;
|
||||
DWORD dwBufferSize = 0;
|
||||
DWORD dwItemCount = 0;
|
||||
PDH_RAW_COUNTER_ITEM *pItems = NULL;
|
||||
|
||||
// Initialize output
|
||||
*ret = 0;
|
||||
|
||||
// Get the required size of the pItems buffer
|
||||
status = PdhGetRawCounterArray(hCounter, &dwBufferSize, &dwItemCount, NULL);
|
||||
if (PDH_MORE_DATA == status)
|
||||
{
|
||||
pItems = (PDH_RAW_COUNTER_ITEM *) malloc(dwBufferSize);
|
||||
if (pItems)
|
||||
{
|
||||
// Actually query the counter
|
||||
status = PdhGetRawCounterArray(hCounter, &dwBufferSize, &dwItemCount, pItems);
|
||||
if (ERROR_SUCCESS == status) {
|
||||
for (i = 0; i < dwItemCount; i++) {
|
||||
if (wcscmp(L"_Total", pItems[i].szName) == 0) {
|
||||
*ret = pItems[i].RawValue.FirstValue;
|
||||
break;
|
||||
} else {
|
||||
*ret += pItems[i].RawValue.FirstValue;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
*ret = -1;
|
||||
goto cleanup;
|
||||
}
|
||||
// Reset structures
|
||||
free(pItems);
|
||||
pItems = NULL;
|
||||
dwBufferSize = dwItemCount = 0;
|
||||
} else {
|
||||
*ret = -1;
|
||||
status = PDH_NO_DATA;
|
||||
goto cleanup;
|
||||
}
|
||||
} else {
|
||||
*ret = -1;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
cleanup:
|
||||
if (pItems) {
|
||||
free(pItems);
|
||||
}
|
||||
|
||||
return status;
|
||||
}
|
@ -47,7 +47,8 @@ long now() {
|
||||
public void parseSystemInfoString() {
|
||||
SysInfoWindowsMock tester = new SysInfoWindowsMock();
|
||||
tester.setSysinfoString(
|
||||
"17177038848,8589467648,15232745472,6400417792,1,2805000,6261812\r\n");
|
||||
"17177038848,8589467648,15232745472,6400417792,1,2805000,6261812," +
|
||||
"1234567,2345678,3456789,4567890\r\n");
|
||||
// info str derived from windows shell command has \r\n termination
|
||||
assertEquals(17177038848L, tester.getVirtualMemorySize());
|
||||
assertEquals(8589467648L, tester.getPhysicalMemorySize());
|
||||
@ -57,6 +58,10 @@ public void parseSystemInfoString() {
|
||||
assertEquals(1, tester.getNumCores());
|
||||
assertEquals(2805000L, tester.getCpuFrequency());
|
||||
assertEquals(6261812L, tester.getCumulativeCpuTime());
|
||||
assertEquals(1234567L, tester.getStorageBytesRead());
|
||||
assertEquals(2345678L, tester.getStorageBytesWritten());
|
||||
assertEquals(3456789L, tester.getNetworkBytesRead());
|
||||
assertEquals(4567890L, tester.getNetworkBytesWritten());
|
||||
// undef on first call
|
||||
assertEquals((float)CpuTimeTracker.UNAVAILABLE,
|
||||
tester.getCpuUsagePercentage(), 0.0);
|
||||
@ -68,7 +73,8 @@ public void parseSystemInfoString() {
|
||||
public void refreshAndCpuUsage() throws InterruptedException {
|
||||
SysInfoWindowsMock tester = new SysInfoWindowsMock();
|
||||
tester.setSysinfoString(
|
||||
"17177038848,8589467648,15232745472,6400417792,1,2805000,6261812\r\n");
|
||||
"17177038848,8589467648,15232745472,6400417792,1,2805000,6261812," +
|
||||
"1234567,2345678,3456789,4567890\r\n");
|
||||
// info str derived from windows shell command has \r\n termination
|
||||
tester.getAvailablePhysicalMemorySize();
|
||||
// verify information has been refreshed
|
||||
@ -79,7 +85,8 @@ public void refreshAndCpuUsage() throws InterruptedException {
|
||||
tester.getNumVCoresUsed(), 0.0);
|
||||
|
||||
tester.setSysinfoString(
|
||||
"17177038848,8589467648,15232745472,5400417792,1,2805000,6263012\r\n");
|
||||
"17177038848,8589467648,15232745472,5400417792,1,2805000,6263012," +
|
||||
"1234567,2345678,3456789,4567890\r\n");
|
||||
tester.getAvailablePhysicalMemorySize();
|
||||
// verify information has not been refreshed
|
||||
assertEquals(6400417792L, tester.getAvailablePhysicalMemorySize());
|
||||
@ -106,12 +113,14 @@ public void refreshAndCpuUsageMulticore() throws InterruptedException {
|
||||
// test with 12 cores
|
||||
SysInfoWindowsMock tester = new SysInfoWindowsMock();
|
||||
tester.setSysinfoString(
|
||||
"17177038848,8589467648,15232745472,6400417792,12,2805000,6261812\r\n");
|
||||
"17177038848,8589467648,15232745472,6400417792,12,2805000,6261812," +
|
||||
"1234567,2345678,3456789,4567890\r\n");
|
||||
// verify information has been refreshed
|
||||
assertEquals(6400417792L, tester.getAvailablePhysicalMemorySize());
|
||||
|
||||
tester.setSysinfoString(
|
||||
"17177038848,8589467648,15232745472,5400417792,12,2805000,6263012\r\n");
|
||||
"17177038848,8589467648,15232745472,5400417792,12,2805000,6263012," +
|
||||
"1234567,2345678,3456789,4567890\r\n");
|
||||
// verify information has not been refreshed
|
||||
assertEquals(6400417792L, tester.getAvailablePhysicalMemorySize());
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user