HDFS-15711. Add Metrics to HttpFS Server. (#2521) Contributed by Ahmed Hussein and Kihwal Lee

This commit is contained in:
Ahmed Hussein 2020-12-10 13:09:53 -06:00 committed by GitHub
parent c2cecfc9b9
commit 3ec01b1bb3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 352 additions and 19 deletions

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.fs.http.server;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockStoragePolicySpi;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileChecksum;
@ -47,7 +48,6 @@
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
import org.apache.hadoop.hdfs.protocol.SnapshotStatus;
import org.apache.hadoop.hdfs.web.JsonUtil;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.lib.service.FileSystemAccess;
import org.apache.hadoop.util.StringUtils;
import org.json.simple.JSONArray;
@ -73,7 +73,22 @@
* FileSystem operation executors used by {@link HttpFSServer}.
*/
@InterfaceAudience.Private
public class FSOperations {
public final class FSOperations {
private static int bufferSize = 4096;
private FSOperations() {
// not called
}
/**
* Set the buffer size. The size is set during the initialization of
* HttpFSServerWebApp.
* @param conf the configuration to get the bufferSize
*/
public static void setBufferSize(Configuration conf) {
bufferSize = conf.getInt(HTTPFS_BUFFER_SIZE_KEY,
HTTP_BUFFER_SIZE_DEFAULT);
}
/**
* @param fileStatus a FileStatus object
@ -436,10 +451,9 @@ public FSAppend(InputStream is, String path) {
*/
@Override
public Void execute(FileSystem fs) throws IOException {
int bufferSize = fs.getConf().getInt("httpfs.buffer.size", 4096);
OutputStream os = fs.append(path, bufferSize);
IOUtils.copyBytes(is, os, bufferSize, true);
os.close();
long bytes = copyBytes(is, os);
HttpFSServerWebApp.get().getMetrics().incrBytesWritten(bytes);
return null;
}
@ -522,6 +536,7 @@ public FSTruncate(String path, long newLength) {
@Override
public JSONObject execute(FileSystem fs) throws IOException {
boolean result = fs.truncate(path, newLength);
HttpFSServerWebApp.get().getMetrics().incrOpsTruncate();
return toJSON(
StringUtils.toLowerCase(HttpFSFileSystem.TRUNCATE_JSON), result);
}
@ -638,16 +653,65 @@ public Void execute(FileSystem fs) throws IOException {
fsPermission = FsCreateModes.create(fsPermission,
new FsPermission(unmaskedPermission));
}
int bufferSize = fs.getConf().getInt(HTTPFS_BUFFER_SIZE_KEY,
HTTP_BUFFER_SIZE_DEFAULT);
OutputStream os = fs.create(path, fsPermission, override, bufferSize, replication, blockSize, null);
IOUtils.copyBytes(is, os, bufferSize, true);
os.close();
long bytes = copyBytes(is, os);
HttpFSServerWebApp.get().getMetrics().incrBytesWritten(bytes);
return null;
}
}
/**
* These copyBytes methods combines the two different flavors used originally.
* One with length and another one with buffer size.
* In this impl, buffer size is determined internally, which is a singleton
* normally set during initialization.
* @param in the inputStream
* @param out the outputStream
* @return the totalBytes
* @throws IOException the exception to be thrown.
*/
public static long copyBytes(InputStream in, OutputStream out)
throws IOException {
return copyBytes(in, out, Long.MAX_VALUE);
}
public static long copyBytes(InputStream in, OutputStream out, long count)
throws IOException {
long totalBytes = 0;
// If bufferSize is not initialized use 4k. This will not happen
// if all callers check and set it.
byte[] buf = new byte[bufferSize];
long bytesRemaining = count;
int bytesRead;
try {
while (bytesRemaining > 0) {
int bytesToRead = (int)
(bytesRemaining < buf.length ? bytesRemaining : buf.length);
bytesRead = in.read(buf, 0, bytesToRead);
if (bytesRead == -1) {
break;
}
out.write(buf, 0, bytesRead);
bytesRemaining -= bytesRead;
totalBytes += bytesRead;
}
return totalBytes;
} finally {
// Originally IOUtils.copyBytes() were called with close=true. So we are
// implementing the same behavior here.
try {
in.close();
} finally {
out.close();
}
}
}
/**
* Executor that performs a delete FileSystemAccess files system operation.
*/
@ -680,6 +744,7 @@ public FSDelete(String path, boolean recursive) {
@Override
public JSONObject execute(FileSystem fs) throws IOException {
boolean deleted = fs.delete(path, recursive);
HttpFSServerWebApp.get().getMetrics().incrOpsDelete();
return toJSON(
StringUtils.toLowerCase(HttpFSFileSystem.DELETE_JSON), deleted);
}
@ -748,6 +813,7 @@ public FSFileStatus(String path) {
@Override
public Map execute(FileSystem fs) throws IOException {
FileStatus status = fs.getFileStatus(path);
HttpFSServerWebApp.get().getMetrics().incrOpsStat();
return toJson(status);
}
@ -776,7 +842,6 @@ public JSONObject execute(FileSystem fs) throws IOException {
json.put(HttpFSFileSystem.HOME_DIR_JSON, homeDir.toUri().getPath());
return json;
}
}
/**
@ -814,6 +879,7 @@ public FSListStatus(String path, String filter) throws IOException {
@Override
public Map execute(FileSystem fs) throws IOException {
FileStatus[] fileStatuses = fs.listStatus(path, filter);
HttpFSServerWebApp.get().getMetrics().incrOpsListing();
return toJson(fileStatuses, fs.getFileStatus(path).isFile());
}
@ -905,6 +971,7 @@ public JSONObject execute(FileSystem fs) throws IOException {
new FsPermission(unmaskedPermission));
}
boolean mkdirs = fs.mkdirs(path, fsPermission);
HttpFSServerWebApp.get().getMetrics().incrOpsMkdir();
return toJSON(HttpFSFileSystem.MKDIRS_JSON, mkdirs);
}
@ -937,8 +1004,8 @@ public FSOpen(String path) {
*/
@Override
public InputStream execute(FileSystem fs) throws IOException {
int bufferSize = HttpFSServerWebApp.get().getConfig().getInt(
HTTPFS_BUFFER_SIZE_KEY, HTTP_BUFFER_SIZE_DEFAULT);
// Only updating ops count. bytesRead is updated in InputStreamEntity
HttpFSServerWebApp.get().getMetrics().incrOpsOpen();
return fs.open(path, bufferSize);
}
@ -976,6 +1043,7 @@ public FSRename(String path, String toPath) {
@Override
public JSONObject execute(FileSystem fs) throws IOException {
boolean renamed = fs.rename(path, toPath);
HttpFSServerWebApp.get().getMetrics().incrOpsRename();
return toJSON(HttpFSFileSystem.RENAME_JSON, renamed);
}
@ -1944,6 +2012,7 @@ public Void execute(FileSystem fs) throws IOException {
if (fs instanceof DistributedFileSystem) {
DistributedFileSystem dfs = (DistributedFileSystem) fs;
dfs.access(path, mode);
HttpFSServerWebApp.get().getMetrics().incrOpsCheckAccess();
} else {
throw new UnsupportedOperationException("checkaccess is "
+ "not supported for HttpFs on " + fs.getClass()

View File

@ -21,9 +21,13 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.http.server.metrics.HttpFSServerMetrics;
import org.apache.hadoop.lib.server.ServerException;
import org.apache.hadoop.lib.service.FileSystemAccess;
import org.apache.hadoop.lib.servlet.ServerWebApp;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.util.JvmPauseMonitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -56,6 +60,7 @@ public class HttpFSServerWebApp extends ServerWebApp {
public static final String CONF_ADMIN_GROUP = "admin.group";
private static HttpFSServerWebApp SERVER;
private static HttpFSServerMetrics metrics;
private String adminGroup;
@ -102,6 +107,7 @@ public void init() throws ServerException {
LOG.info("Connects to Namenode [{}]",
get().get(FileSystemAccess.class).getFileSystemConfiguration().
getTrimmed(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY));
setMetrics(getConfig());
}
/**
@ -110,9 +116,22 @@ public void init() throws ServerException {
@Override
public void destroy() {
SERVER = null;
if (metrics != null) {
metrics.shutdown();
}
super.destroy();
}
private static void setMetrics(Configuration config) {
LOG.info("Initializing HttpFSServerMetrics");
metrics = HttpFSServerMetrics.create(config, "HttpFSServer");
JvmPauseMonitor pauseMonitor = new JvmPauseMonitor();
pauseMonitor.init(config);
pauseMonitor.start();
metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);
FSOperations.setBufferSize(config);
DefaultMetricsSystem.initialize("HttpFSServer");
}
/**
* Returns HttpFSServer server singleton, configuration and services are
* accessible through it.
@ -123,6 +142,14 @@ public static HttpFSServerWebApp get() {
return SERVER;
}
/**
* gets the HttpFSServerMetrics instance.
* @return the HttpFSServerMetrics singleton.
*/
public static HttpFSServerMetrics getMetrics() {
return metrics;
}
/**
* Returns HttpFSServer admin group.
*

View File

@ -0,0 +1,163 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.http.server.metrics;
import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.source.JvmMetrics;
import java.util.concurrent.ThreadLocalRandom;
/**
*
* This class is for maintaining the various HttpFSServer statistics
* and publishing them through the metrics interfaces.
* This also registers the JMX MBean for RPC.
* <p>
* This class has a number of metrics variables that are publicly accessible;
* these variables (objects) have methods to update their values;
* for example:
* <p> {@link #bytesRead}.inc()
*
*/
@InterfaceAudience.Private
@Metrics(about="HttpFSServer metrics", context="httpfs")
public class HttpFSServerMetrics {
private @Metric MutableCounterLong bytesWritten;
private @Metric MutableCounterLong bytesRead;
// Write ops
private @Metric MutableCounterLong opsCreate;
private @Metric MutableCounterLong opsAppend;
private @Metric MutableCounterLong opsTruncate;
private @Metric MutableCounterLong opsDelete;
private @Metric MutableCounterLong opsRename;
private @Metric MutableCounterLong opsMkdir;
// Read ops
private @Metric MutableCounterLong opsOpen;
private @Metric MutableCounterLong opsListing;
private @Metric MutableCounterLong opsStat;
private @Metric MutableCounterLong opsCheckAccess;
private final MetricsRegistry registry = new MetricsRegistry("httpfsserver");
private final String name;
private JvmMetrics jvmMetrics = null;
public HttpFSServerMetrics(String name, String sessionId,
final JvmMetrics jvmMetrics) {
this.name = name;
this.jvmMetrics = jvmMetrics;
registry.tag(SessionId, sessionId);
}
public static HttpFSServerMetrics create(Configuration conf,
String serverName) {
String sessionId = conf.get(DFSConfigKeys.DFS_METRICS_SESSION_ID_KEY);
MetricsSystem ms = DefaultMetricsSystem.instance();
JvmMetrics jm = JvmMetrics.create("HttpFSServer", sessionId, ms);
String name = "ServerActivity-"+ (serverName.isEmpty()
? "UndefinedServer"+ ThreadLocalRandom.current().nextInt()
: serverName.replace(':', '-'));
return ms.register(name, null, new HttpFSServerMetrics(name,
sessionId, jm));
}
public String name() {
return name;
}
public JvmMetrics getJvmMetrics() {
return jvmMetrics;
}
public void incrBytesWritten(long bytes) {
bytesWritten.incr(bytes);
}
public void incrBytesRead(long bytes) {
bytesRead.incr(bytes);
}
public void incrOpsCreate() {
opsCreate.incr();
}
public void incrOpsAppend() {
opsAppend.incr();
}
public void incrOpsTruncate() {
opsTruncate.incr();
}
public void incrOpsDelete() {
opsDelete.incr();
}
public void incrOpsRename() {
opsRename.incr();
}
public void incrOpsMkdir() {
opsMkdir.incr();
}
public void incrOpsOpen() {
opsOpen.incr();
}
public void incrOpsListing() {
opsListing.incr();
}
public void incrOpsStat() {
opsStat.incr();
}
public void incrOpsCheckAccess() {
opsCheckAccess.incr();
}
public void shutdown() {
DefaultMetricsSystem.shutdown();
}
public long getOpsMkdir() {
return opsMkdir.value();
}
public long getOpsListing() {
return opsListing.value();
}
public long getOpsStat() {
return opsStat.value();
}
}

View File

@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* A package to implement metrics for the HttpFS Server.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
package org.apache.hadoop.fs.http.server.metrics;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

View File

@ -19,6 +19,9 @@
package org.apache.hadoop.lib.wsrs;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.http.server.FSOperations;
import org.apache.hadoop.fs.http.server.HttpFSServerWebApp;
import org.apache.hadoop.fs.http.server.metrics.HttpFSServerMetrics;
import org.apache.hadoop.io.IOUtils;
import javax.ws.rs.core.StreamingOutput;
@ -45,10 +48,17 @@ public InputStreamEntity(InputStream is) {
@Override
public void write(OutputStream os) throws IOException {
IOUtils.skipFully(is, offset);
long bytes = 0L;
if (len == -1) {
IOUtils.copyBytes(is, os, 4096, true);
// Use the configured buffer size instead of hardcoding to 4k
bytes = FSOperations.copyBytes(is, os);
} else {
IOUtils.copyBytes(is, os, len, true);
bytes = FSOperations.copyBytes(is, os, len);
}
// Update metrics.
HttpFSServerMetrics metrics = HttpFSServerWebApp.get().getMetrics();
if (metrics != null) {
metrics.incrBytesRead(bytes);
}
}
}

View File

@ -61,6 +61,7 @@
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@ -108,6 +109,7 @@
import org.apache.hadoop.thirdparty.com.google.common.collect.Maps;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.regex.Pattern;
import javax.ws.rs.HttpMethod;
@ -120,6 +122,23 @@
*/
public class TestHttpFSServer extends HFSTestCase {
/**
* define metric getters for unit tests.
*/
private static Callable<Long> defaultEntryMetricGetter = () -> 0L;
private static Callable<Long> defaultExitMetricGetter = () -> 1L;
private static HashMap<String, Callable<Long>> metricsGetter =
new HashMap<String, Callable<Long>>() {
{
put("LISTSTATUS",
() -> HttpFSServerWebApp.get().getMetrics().getOpsListing());
put("MKDIRS",
() -> HttpFSServerWebApp.get().getMetrics().getOpsMkdir());
put("GETFILESTATUS",
() -> HttpFSServerWebApp.get().getMetrics().getOpsStat());
}
};
@Test
@TestDir
@TestJetty
@ -408,7 +427,8 @@ public void instrumentation() throws Exception {
@TestHdfs
public void testHdfsAccess() throws Exception {
createHttpFSServer(false, false);
long oldOpsListStatus =
metricsGetter.get("LISTSTATUS").call();
String user = HadoopUsersConfTestHelper.getHadoopUsers()[0];
URL url = new URL(TestJettyHelper.getJettyURL(),
MessageFormat.format("/webhdfs/v1/?user.name={0}&op=liststatus",
@ -419,6 +439,8 @@ public void testHdfsAccess() throws Exception {
new InputStreamReader(conn.getInputStream()));
reader.readLine();
reader.close();
Assert.assertEquals(1 + oldOpsListStatus,
(long) metricsGetter.get("LISTSTATUS").call());
}
@Test
@ -427,7 +449,8 @@ public void testHdfsAccess() throws Exception {
@TestHdfs
public void testMkdirs() throws Exception {
createHttpFSServer(false, false);
long oldMkdirOpsStat =
metricsGetter.get("MKDIRS").call();
String user = HadoopUsersConfTestHelper.getHadoopUsers()[0];
URL url = new URL(TestJettyHelper.getJettyURL(), MessageFormat.format(
"/webhdfs/v1/tmp/sub-tmp?user.name={0}&op=MKDIRS", user));
@ -435,8 +458,10 @@ public void testMkdirs() throws Exception {
conn.setRequestMethod("PUT");
conn.connect();
Assert.assertEquals(conn.getResponseCode(), HttpURLConnection.HTTP_OK);
getStatus("/tmp/sub-tmp", "LISTSTATUS");
long opsStat =
metricsGetter.get("MKDIRS").call();
Assert.assertEquals(1 + oldMkdirOpsStat, opsStat);
}
@Test
@ -445,7 +470,8 @@ public void testMkdirs() throws Exception {
@TestHdfs
public void testGlobFilter() throws Exception {
createHttpFSServer(false, false);
long oldOpsListStatus =
metricsGetter.get("LISTSTATUS").call();
FileSystem fs = FileSystem.get(TestHdfsHelper.getHdfsConf());
fs.mkdirs(new Path("/tmp"));
fs.create(new Path("/tmp/foo.txt")).close();
@ -460,6 +486,8 @@ public void testGlobFilter() throws Exception {
new InputStreamReader(conn.getInputStream()));
reader.readLine();
reader.close();
Assert.assertEquals(1 + oldOpsListStatus,
(long) metricsGetter.get("LISTSTATUS").call());
}
/**
@ -519,6 +547,9 @@ private void createWithHttp(String filename, String perms,
*/
private void createDirWithHttp(String dirname, String perms,
String unmaskedPerms) throws Exception {
// get the createDirMetrics
long oldOpsMkdir =
metricsGetter.get("MKDIRS").call();
String user = HadoopUsersConfTestHelper.getHadoopUsers()[0];
// Remove leading / from filename
if (dirname.charAt(0) == '/') {
@ -542,6 +573,8 @@ private void createDirWithHttp(String dirname, String perms,
conn.setRequestMethod("PUT");
conn.connect();
Assert.assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode());
Assert.assertEquals(1 + oldOpsMkdir,
(long) metricsGetter.get("MKDIRS").call());
}
/**
@ -555,6 +588,8 @@ private void createDirWithHttp(String dirname, String perms,
*/
private String getStatus(String filename, String command)
throws Exception {
long oldOpsStat =
metricsGetter.getOrDefault(command, defaultEntryMetricGetter).call();
String user = HadoopUsersConfTestHelper.getHadoopUsers()[0];
// Remove leading / from filename
if (filename.charAt(0) == '/') {
@ -570,7 +605,9 @@ private String getStatus(String filename, String command)
BufferedReader reader =
new BufferedReader(new InputStreamReader(conn.getInputStream()));
long opsStat =
metricsGetter.getOrDefault(command, defaultExitMetricGetter).call();
Assert.assertEquals(oldOpsStat + 1L, opsStat);
return reader.readLine();
}