diff --git a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/FSOperations.java b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/FSOperations.java index 67d4761543..e272cdc71b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/FSOperations.java +++ b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/FSOperations.java @@ -18,6 +18,7 @@ package org.apache.hadoop.fs.http.server; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockStoragePolicySpi; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.FileChecksum; @@ -47,7 +48,6 @@ import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus; import org.apache.hadoop.hdfs.protocol.SnapshotStatus; import org.apache.hadoop.hdfs.web.JsonUtil; -import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.lib.service.FileSystemAccess; import org.apache.hadoop.util.StringUtils; import org.json.simple.JSONArray; @@ -73,7 +73,22 @@ * FileSystem operation executors used by {@link HttpFSServer}. */ @InterfaceAudience.Private -public class FSOperations { +public final class FSOperations { + + private static int bufferSize = 4096; + + private FSOperations() { + // not called + } + /** + * Set the buffer size. The size is set during the initialization of + * HttpFSServerWebApp. + * @param conf the configuration to get the bufferSize + */ + public static void setBufferSize(Configuration conf) { + bufferSize = conf.getInt(HTTPFS_BUFFER_SIZE_KEY, + HTTP_BUFFER_SIZE_DEFAULT); + } /** * @param fileStatus a FileStatus object @@ -436,10 +451,9 @@ public FSAppend(InputStream is, String path) { */ @Override public Void execute(FileSystem fs) throws IOException { - int bufferSize = fs.getConf().getInt("httpfs.buffer.size", 4096); OutputStream os = fs.append(path, bufferSize); - IOUtils.copyBytes(is, os, bufferSize, true); - os.close(); + long bytes = copyBytes(is, os); + HttpFSServerWebApp.get().getMetrics().incrBytesWritten(bytes); return null; } @@ -522,6 +536,7 @@ public FSTruncate(String path, long newLength) { @Override public JSONObject execute(FileSystem fs) throws IOException { boolean result = fs.truncate(path, newLength); + HttpFSServerWebApp.get().getMetrics().incrOpsTruncate(); return toJSON( StringUtils.toLowerCase(HttpFSFileSystem.TRUNCATE_JSON), result); } @@ -638,16 +653,65 @@ public Void execute(FileSystem fs) throws IOException { fsPermission = FsCreateModes.create(fsPermission, new FsPermission(unmaskedPermission)); } - int bufferSize = fs.getConf().getInt(HTTPFS_BUFFER_SIZE_KEY, - HTTP_BUFFER_SIZE_DEFAULT); OutputStream os = fs.create(path, fsPermission, override, bufferSize, replication, blockSize, null); - IOUtils.copyBytes(is, os, bufferSize, true); - os.close(); + long bytes = copyBytes(is, os); + HttpFSServerWebApp.get().getMetrics().incrBytesWritten(bytes); return null; } } + /** + * These copyBytes methods combines the two different flavors used originally. + * One with length and another one with buffer size. + * In this impl, buffer size is determined internally, which is a singleton + * normally set during initialization. + * @param in the inputStream + * @param out the outputStream + * @return the totalBytes + * @throws IOException the exception to be thrown. + */ + public static long copyBytes(InputStream in, OutputStream out) + throws IOException { + return copyBytes(in, out, Long.MAX_VALUE); + } + + public static long copyBytes(InputStream in, OutputStream out, long count) + throws IOException { + long totalBytes = 0; + + // If bufferSize is not initialized use 4k. This will not happen + // if all callers check and set it. + byte[] buf = new byte[bufferSize]; + long bytesRemaining = count; + int bytesRead; + + try { + while (bytesRemaining > 0) { + int bytesToRead = (int) + (bytesRemaining < buf.length ? bytesRemaining : buf.length); + + bytesRead = in.read(buf, 0, bytesToRead); + if (bytesRead == -1) { + break; + } + + out.write(buf, 0, bytesRead); + bytesRemaining -= bytesRead; + totalBytes += bytesRead; + } + return totalBytes; + } finally { + // Originally IOUtils.copyBytes() were called with close=true. So we are + // implementing the same behavior here. + try { + in.close(); + } finally { + out.close(); + } + } + } + /** * Executor that performs a delete FileSystemAccess files system operation. */ @@ -680,6 +744,7 @@ public FSDelete(String path, boolean recursive) { @Override public JSONObject execute(FileSystem fs) throws IOException { boolean deleted = fs.delete(path, recursive); + HttpFSServerWebApp.get().getMetrics().incrOpsDelete(); return toJSON( StringUtils.toLowerCase(HttpFSFileSystem.DELETE_JSON), deleted); } @@ -748,6 +813,7 @@ public FSFileStatus(String path) { @Override public Map execute(FileSystem fs) throws IOException { FileStatus status = fs.getFileStatus(path); + HttpFSServerWebApp.get().getMetrics().incrOpsStat(); return toJson(status); } @@ -776,7 +842,6 @@ public JSONObject execute(FileSystem fs) throws IOException { json.put(HttpFSFileSystem.HOME_DIR_JSON, homeDir.toUri().getPath()); return json; } - } /** @@ -814,6 +879,7 @@ public FSListStatus(String path, String filter) throws IOException { @Override public Map execute(FileSystem fs) throws IOException { FileStatus[] fileStatuses = fs.listStatus(path, filter); + HttpFSServerWebApp.get().getMetrics().incrOpsListing(); return toJson(fileStatuses, fs.getFileStatus(path).isFile()); } @@ -905,6 +971,7 @@ public JSONObject execute(FileSystem fs) throws IOException { new FsPermission(unmaskedPermission)); } boolean mkdirs = fs.mkdirs(path, fsPermission); + HttpFSServerWebApp.get().getMetrics().incrOpsMkdir(); return toJSON(HttpFSFileSystem.MKDIRS_JSON, mkdirs); } @@ -937,8 +1004,8 @@ public FSOpen(String path) { */ @Override public InputStream execute(FileSystem fs) throws IOException { - int bufferSize = HttpFSServerWebApp.get().getConfig().getInt( - HTTPFS_BUFFER_SIZE_KEY, HTTP_BUFFER_SIZE_DEFAULT); + // Only updating ops count. bytesRead is updated in InputStreamEntity + HttpFSServerWebApp.get().getMetrics().incrOpsOpen(); return fs.open(path, bufferSize); } @@ -976,6 +1043,7 @@ public FSRename(String path, String toPath) { @Override public JSONObject execute(FileSystem fs) throws IOException { boolean renamed = fs.rename(path, toPath); + HttpFSServerWebApp.get().getMetrics().incrOpsRename(); return toJSON(HttpFSFileSystem.RENAME_JSON, renamed); } @@ -1944,6 +2012,7 @@ public Void execute(FileSystem fs) throws IOException { if (fs instanceof DistributedFileSystem) { DistributedFileSystem dfs = (DistributedFileSystem) fs; dfs.access(path, mode); + HttpFSServerWebApp.get().getMetrics().incrOpsCheckAccess(); } else { throw new UnsupportedOperationException("checkaccess is " + "not supported for HttpFs on " + fs.getClass() diff --git a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/HttpFSServerWebApp.java b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/HttpFSServerWebApp.java index ef2a246586..bf4b2cdad1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/HttpFSServerWebApp.java +++ b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/HttpFSServerWebApp.java @@ -21,9 +21,13 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.fs.http.server.metrics.HttpFSServerMetrics; import org.apache.hadoop.lib.server.ServerException; import org.apache.hadoop.lib.service.FileSystemAccess; import org.apache.hadoop.lib.servlet.ServerWebApp; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.util.JvmPauseMonitor; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,6 +60,7 @@ public class HttpFSServerWebApp extends ServerWebApp { public static final String CONF_ADMIN_GROUP = "admin.group"; private static HttpFSServerWebApp SERVER; + private static HttpFSServerMetrics metrics; private String adminGroup; @@ -102,6 +107,7 @@ public void init() throws ServerException { LOG.info("Connects to Namenode [{}]", get().get(FileSystemAccess.class).getFileSystemConfiguration(). getTrimmed(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY)); + setMetrics(getConfig()); } /** @@ -110,9 +116,22 @@ public void init() throws ServerException { @Override public void destroy() { SERVER = null; + if (metrics != null) { + metrics.shutdown(); + } super.destroy(); } + private static void setMetrics(Configuration config) { + LOG.info("Initializing HttpFSServerMetrics"); + metrics = HttpFSServerMetrics.create(config, "HttpFSServer"); + JvmPauseMonitor pauseMonitor = new JvmPauseMonitor(); + pauseMonitor.init(config); + pauseMonitor.start(); + metrics.getJvmMetrics().setPauseMonitor(pauseMonitor); + FSOperations.setBufferSize(config); + DefaultMetricsSystem.initialize("HttpFSServer"); + } /** * Returns HttpFSServer server singleton, configuration and services are * accessible through it. @@ -123,6 +142,14 @@ public static HttpFSServerWebApp get() { return SERVER; } + /** + * gets the HttpFSServerMetrics instance. + * @return the HttpFSServerMetrics singleton. + */ + public static HttpFSServerMetrics getMetrics() { + return metrics; + } + /** * Returns HttpFSServer admin group. * diff --git a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/metrics/HttpFSServerMetrics.java b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/metrics/HttpFSServerMetrics.java new file mode 100644 index 0000000000..524ec09290 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/metrics/HttpFSServerMetrics.java @@ -0,0 +1,163 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.http.server.metrics; + +import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableCounterLong; +import org.apache.hadoop.metrics2.source.JvmMetrics; + +import java.util.concurrent.ThreadLocalRandom; + +/** + * + * This class is for maintaining the various HttpFSServer statistics + * and publishing them through the metrics interfaces. + * This also registers the JMX MBean for RPC. + *
+ * This class has a number of metrics variables that are publicly accessible; + * these variables (objects) have methods to update their values; + * for example: + *
{@link #bytesRead}.inc()
+ *
+ */
+@InterfaceAudience.Private
+@Metrics(about="HttpFSServer metrics", context="httpfs")
+public class HttpFSServerMetrics {
+
+ private @Metric MutableCounterLong bytesWritten;
+ private @Metric MutableCounterLong bytesRead;
+
+ // Write ops
+ private @Metric MutableCounterLong opsCreate;
+ private @Metric MutableCounterLong opsAppend;
+ private @Metric MutableCounterLong opsTruncate;
+ private @Metric MutableCounterLong opsDelete;
+ private @Metric MutableCounterLong opsRename;
+ private @Metric MutableCounterLong opsMkdir;
+
+ // Read ops
+ private @Metric MutableCounterLong opsOpen;
+ private @Metric MutableCounterLong opsListing;
+ private @Metric MutableCounterLong opsStat;
+ private @Metric MutableCounterLong opsCheckAccess;
+
+ private final MetricsRegistry registry = new MetricsRegistry("httpfsserver");
+ private final String name;
+ private JvmMetrics jvmMetrics = null;
+
+ public HttpFSServerMetrics(String name, String sessionId,
+ final JvmMetrics jvmMetrics) {
+ this.name = name;
+ this.jvmMetrics = jvmMetrics;
+ registry.tag(SessionId, sessionId);
+ }
+
+ public static HttpFSServerMetrics create(Configuration conf,
+ String serverName) {
+ String sessionId = conf.get(DFSConfigKeys.DFS_METRICS_SESSION_ID_KEY);
+ MetricsSystem ms = DefaultMetricsSystem.instance();
+ JvmMetrics jm = JvmMetrics.create("HttpFSServer", sessionId, ms);
+ String name = "ServerActivity-"+ (serverName.isEmpty()
+ ? "UndefinedServer"+ ThreadLocalRandom.current().nextInt()
+ : serverName.replace(':', '-'));
+
+ return ms.register(name, null, new HttpFSServerMetrics(name,
+ sessionId, jm));
+ }
+
+ public String name() {
+ return name;
+ }
+
+ public JvmMetrics getJvmMetrics() {
+ return jvmMetrics;
+ }
+
+ public void incrBytesWritten(long bytes) {
+ bytesWritten.incr(bytes);
+ }
+
+ public void incrBytesRead(long bytes) {
+ bytesRead.incr(bytes);
+ }
+
+ public void incrOpsCreate() {
+ opsCreate.incr();
+ }
+
+ public void incrOpsAppend() {
+ opsAppend.incr();
+ }
+
+ public void incrOpsTruncate() {
+ opsTruncate.incr();
+ }
+
+ public void incrOpsDelete() {
+ opsDelete.incr();
+ }
+
+ public void incrOpsRename() {
+ opsRename.incr();
+ }
+
+ public void incrOpsMkdir() {
+ opsMkdir.incr();
+ }
+
+ public void incrOpsOpen() {
+ opsOpen.incr();
+ }
+
+ public void incrOpsListing() {
+ opsListing.incr();
+ }
+
+ public void incrOpsStat() {
+ opsStat.incr();
+ }
+
+ public void incrOpsCheckAccess() {
+ opsCheckAccess.incr();
+ }
+
+ public void shutdown() {
+ DefaultMetricsSystem.shutdown();
+ }
+
+ public long getOpsMkdir() {
+ return opsMkdir.value();
+ }
+
+ public long getOpsListing() {
+ return opsListing.value();
+ }
+
+ public long getOpsStat() {
+ return opsStat.value();
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/metrics/package-info.java b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/metrics/package-info.java
new file mode 100644
index 0000000000..47e8d4a4c2
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/metrics/package-info.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * A package to implement metrics for the HttpFS Server.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+package org.apache.hadoop.fs.http.server.metrics;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/wsrs/InputStreamEntity.java b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/wsrs/InputStreamEntity.java
index 9edb24a7bc..5f387c9085 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/wsrs/InputStreamEntity.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/wsrs/InputStreamEntity.java
@@ -19,6 +19,9 @@
package org.apache.hadoop.lib.wsrs;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.http.server.FSOperations;
+import org.apache.hadoop.fs.http.server.HttpFSServerWebApp;
+import org.apache.hadoop.fs.http.server.metrics.HttpFSServerMetrics;
import org.apache.hadoop.io.IOUtils;
import javax.ws.rs.core.StreamingOutput;
@@ -45,10 +48,17 @@ public InputStreamEntity(InputStream is) {
@Override
public void write(OutputStream os) throws IOException {
IOUtils.skipFully(is, offset);
+ long bytes = 0L;
if (len == -1) {
- IOUtils.copyBytes(is, os, 4096, true);
+ // Use the configured buffer size instead of hardcoding to 4k
+ bytes = FSOperations.copyBytes(is, os);
} else {
- IOUtils.copyBytes(is, os, len, true);
+ bytes = FSOperations.copyBytes(is, os, len);
+ }
+ // Update metrics.
+ HttpFSServerMetrics metrics = HttpFSServerWebApp.get().getMetrics();
+ if (metrics != null) {
+ metrics.incrBytesRead(bytes);
}
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/fs/http/server/TestHttpFSServer.java b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/fs/http/server/TestHttpFSServer.java
index 434f53cf52..6aa8aa346e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/fs/http/server/TestHttpFSServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/fs/http/server/TestHttpFSServer.java
@@ -61,6 +61,7 @@
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@@ -108,6 +109,7 @@
import org.apache.hadoop.thirdparty.com.google.common.collect.Maps;
import java.util.Properties;
+import java.util.concurrent.Callable;
import java.util.regex.Pattern;
import javax.ws.rs.HttpMethod;
@@ -120,6 +122,23 @@
*/
public class TestHttpFSServer extends HFSTestCase {
+ /**
+ * define metric getters for unit tests.
+ */
+ private static Callable