HDFS-11531. Expose hedged read metrics via libHDFS API. Contributed by Sailesh Mukil.
This commit is contained in:
parent
af8e9842d2
commit
8c81a16a1f
@ -200,6 +200,15 @@ public class DistributedFileSystem extends FileSystem {
|
||||
+ dfs.ugi.getShortUserName()));
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the hedged read metrics object for this client.
|
||||
*
|
||||
* @return object of DFSHedgedReadMetrics
|
||||
*/
|
||||
public DFSHedgedReadMetrics getHedgedReadMetrics() {
|
||||
return dfs.getHedgedReadMetrics();
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks that the passed URI belongs to this filesystem and returns
|
||||
* just the path component. Expects a URI with an absolute path.
|
||||
|
@ -145,6 +145,7 @@ static int doTestHdfsOperations(struct tlhThreadInfo *ti, hdfsFS fs,
|
||||
int ret, expected, numEntries;
|
||||
hdfsFileInfo *fileInfo;
|
||||
struct hdfsReadStatistics *readStats = NULL;
|
||||
struct hdfsHedgedReadMetrics *hedgedMetrics = NULL;
|
||||
|
||||
if (hdfsExists(fs, paths->prefix) == 0) {
|
||||
EXPECT_ZERO(hdfsDelete(fs, paths->prefix, 1));
|
||||
@ -204,6 +205,15 @@ static int doTestHdfsOperations(struct tlhThreadInfo *ti, hdfsFS fs,
|
||||
EXPECT_UINT64_EQ(UINT64_C(0), readStats->totalLocalBytesRead);
|
||||
EXPECT_UINT64_EQ(UINT64_C(0), readStats->totalShortCircuitBytesRead);
|
||||
hdfsFileFreeReadStatistics(readStats);
|
||||
|
||||
/* Verify that we can retrieve the hedged read metrics */
|
||||
EXPECT_ZERO(hdfsGetHedgedReadMetrics(fs, &hedgedMetrics));
|
||||
errno = 0;
|
||||
EXPECT_UINT64_EQ(UINT64_C(0), hedgedMetrics->hedgedReadOps);
|
||||
EXPECT_UINT64_EQ(UINT64_C(0), hedgedMetrics->hedgedReadOpsWin);
|
||||
EXPECT_UINT64_EQ(UINT64_C(0), hedgedMetrics->hedgedReadOpsInCurThread);
|
||||
hdfsFreeHedgedReadMetrics(hedgedMetrics);
|
||||
|
||||
/* TODO: implement readFully and use it here */
|
||||
ret = hdfsRead(fs, file, tmp, sizeof(tmp));
|
||||
if (ret < 0) {
|
||||
|
@ -93,6 +93,87 @@ int hdfsFileIsOpenForRead(hdfsFile file)
|
||||
return (file->type == HDFS_STREAM_INPUT);
|
||||
}
|
||||
|
||||
int hdfsGetHedgedReadMetrics(hdfsFS fs, struct hdfsHedgedReadMetrics **metrics)
|
||||
{
|
||||
jthrowable jthr;
|
||||
jobject hedgedReadMetrics = NULL;
|
||||
jvalue jVal;
|
||||
struct hdfsHedgedReadMetrics *m = NULL;
|
||||
int ret;
|
||||
jobject jFS = (jobject)fs;
|
||||
JNIEnv* env = getJNIEnv();
|
||||
|
||||
if (env == NULL) {
|
||||
errno = EINTERNAL;
|
||||
return -1;
|
||||
}
|
||||
|
||||
jthr = invokeMethod(env, &jVal, INSTANCE, jFS,
|
||||
HADOOP_DFS,
|
||||
"getHedgedReadMetrics",
|
||||
"()Lorg/apache/hadoop/hdfs/DFSHedgedReadMetrics;");
|
||||
if (jthr) {
|
||||
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
|
||||
"hdfsGetHedgedReadMetrics: getHedgedReadMetrics failed");
|
||||
goto done;
|
||||
}
|
||||
hedgedReadMetrics = jVal.l;
|
||||
|
||||
m = malloc(sizeof(struct hdfsHedgedReadMetrics));
|
||||
if (!m) {
|
||||
ret = ENOMEM;
|
||||
goto done;
|
||||
}
|
||||
|
||||
jthr = invokeMethod(env, &jVal, INSTANCE, hedgedReadMetrics,
|
||||
"org/apache/hadoop/hdfs/DFSHedgedReadMetrics",
|
||||
"getHedgedReadOps", "()J");
|
||||
if (jthr) {
|
||||
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
|
||||
"hdfsGetHedgedReadStatistics: getHedgedReadOps failed");
|
||||
goto done;
|
||||
}
|
||||
m->hedgedReadOps = jVal.j;
|
||||
|
||||
jthr = invokeMethod(env, &jVal, INSTANCE, hedgedReadMetrics,
|
||||
"org/apache/hadoop/hdfs/DFSHedgedReadMetrics",
|
||||
"getHedgedReadWins", "()J");
|
||||
if (jthr) {
|
||||
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
|
||||
"hdfsGetHedgedReadStatistics: getHedgedReadWins failed");
|
||||
goto done;
|
||||
}
|
||||
m->hedgedReadOpsWin = jVal.j;
|
||||
|
||||
jthr = invokeMethod(env, &jVal, INSTANCE, hedgedReadMetrics,
|
||||
"org/apache/hadoop/hdfs/DFSHedgedReadMetrics",
|
||||
"getHedgedReadOpsInCurThread", "()J");
|
||||
if (jthr) {
|
||||
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
|
||||
"hdfsGetHedgedReadStatistics: getHedgedReadOpsInCurThread failed");
|
||||
goto done;
|
||||
}
|
||||
m->hedgedReadOpsInCurThread = jVal.j;
|
||||
|
||||
*metrics = m;
|
||||
m = NULL;
|
||||
ret = 0;
|
||||
|
||||
done:
|
||||
destroyLocalReference(env, hedgedReadMetrics);
|
||||
free(m);
|
||||
if (ret) {
|
||||
errno = ret;
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
void hdfsFreeHedgedReadMetrics(struct hdfsHedgedReadMetrics *metrics)
|
||||
{
|
||||
free(metrics);
|
||||
}
|
||||
|
||||
int hdfsFileGetReadStatistics(hdfsFile file,
|
||||
struct hdfsReadStatistics **stats)
|
||||
{
|
||||
|
@ -169,6 +169,35 @@ extern "C" {
|
||||
LIBHDFS_EXTERNAL
|
||||
void hdfsFileFreeReadStatistics(struct hdfsReadStatistics *stats);
|
||||
|
||||
struct hdfsHedgedReadMetrics {
|
||||
uint64_t hedgedReadOps;
|
||||
uint64_t hedgedReadOpsWin;
|
||||
uint64_t hedgedReadOpsInCurThread;
|
||||
};
|
||||
|
||||
/**
|
||||
* Get cluster wide hedged read metrics.
|
||||
*
|
||||
* @param fs The configured filesystem handle
|
||||
* @param metrics (out parameter) on a successful return, the hedged read
|
||||
* metrics. Unchanged otherwise. You must free the returned
|
||||
* statistics with hdfsFreeHedgedReadMetrics.
|
||||
* @return 0 if the metrics were successfully returned, -1 otherwise.
|
||||
* On a failure, please check errno against
|
||||
* ENOTSUP. webhdfs, LocalFilesystem, and so forth may
|
||||
* not support hedged read metrics.
|
||||
*/
|
||||
LIBHDFS_EXTERNAL
|
||||
int hdfsGetHedgedReadMetrics(hdfsFS fs, struct hdfsHedgedReadMetrics **metrics);
|
||||
|
||||
/**
|
||||
* Free HDFS Hedged read metrics.
|
||||
*
|
||||
* @param metrics The HDFS Hedged read metrics to free
|
||||
*/
|
||||
LIBHDFS_EXTERNAL
|
||||
void hdfsFreeHedgedReadMetrics(struct hdfsHedgedReadMetrics *metrics);
|
||||
|
||||
/**
|
||||
* hdfsConnectAsUser - Connect to a hdfs file system as a specific user
|
||||
* Connect to the hdfs.
|
||||
|
Loading…
x
Reference in New Issue
Block a user