HDFS-7449. Add metrics to NFS gateway. Contributed by Brandon Li
This commit is contained in:
parent
0bcea111e5
commit
f6f2a3f1c7
@ -70,4 +70,7 @@ public class NfsConfigKeys {
|
|||||||
public static final int NFS_HTTPS_PORT_DEFAULT = 50579;
|
public static final int NFS_HTTPS_PORT_DEFAULT = 50579;
|
||||||
public static final String NFS_HTTPS_ADDRESS_KEY = "nfs.https.address";
|
public static final String NFS_HTTPS_ADDRESS_KEY = "nfs.https.address";
|
||||||
public static final String NFS_HTTPS_ADDRESS_DEFAULT = "0.0.0.0:" + NFS_HTTPS_PORT_DEFAULT;
|
public static final String NFS_HTTPS_ADDRESS_DEFAULT = "0.0.0.0:" + NFS_HTTPS_PORT_DEFAULT;
|
||||||
|
|
||||||
|
public static final String NFS_METRICS_PERCENTILES_INTERVALS_KEY = "nfs.metrics.percentiles.intervals";
|
||||||
|
public static final String NFS_METRICS_PERCENTILES_INTERVALS_DEFAULT = "";
|
||||||
}
|
}
|
||||||
|
@ -42,7 +42,8 @@ public Nfs3(NfsConfiguration conf) throws IOException {
|
|||||||
|
|
||||||
public Nfs3(NfsConfiguration conf, DatagramSocket registrationSocket,
|
public Nfs3(NfsConfiguration conf, DatagramSocket registrationSocket,
|
||||||
boolean allowInsecurePorts) throws IOException {
|
boolean allowInsecurePorts) throws IOException {
|
||||||
super(new RpcProgramNfs3(conf, registrationSocket, allowInsecurePorts), conf);
|
super(RpcProgramNfs3.createRpcProgramNfs3(conf, registrationSocket,
|
||||||
|
allowInsecurePorts), conf);
|
||||||
mountd = new Mountd(conf, registrationSocket, allowInsecurePorts);
|
mountd = new Mountd(conf, registrationSocket, allowInsecurePorts);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -0,0 +1,220 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.nfs.nfs3;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
|
import org.apache.hadoop.hdfs.nfs.conf.NfsConfigKeys;
|
||||||
|
import org.apache.hadoop.metrics2.MetricsSystem;
|
||||||
|
import org.apache.hadoop.metrics2.annotation.Metric;
|
||||||
|
import org.apache.hadoop.metrics2.annotation.Metrics;
|
||||||
|
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||||
|
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
|
||||||
|
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
|
||||||
|
import org.apache.hadoop.metrics2.lib.MutableQuantiles;
|
||||||
|
import org.apache.hadoop.metrics2.lib.MutableRate;
|
||||||
|
import org.apache.hadoop.metrics2.source.JvmMetrics;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This class is for maintaining the various NFS gateway activity statistics and
|
||||||
|
* publishing them through the metrics interfaces.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@Metrics(about = "Nfs3 metrics", context = "dfs")
|
||||||
|
public class Nfs3Metrics {
|
||||||
|
// All mutable rates are in nanoseconds
|
||||||
|
// No metric for nullProcedure;
|
||||||
|
@Metric MutableRate getattr;
|
||||||
|
@Metric MutableRate setattr;
|
||||||
|
@Metric MutableRate lookup;
|
||||||
|
@Metric MutableRate access;
|
||||||
|
@Metric MutableRate readlink;
|
||||||
|
@Metric MutableRate read;
|
||||||
|
final MutableQuantiles[] readNanosQuantiles;
|
||||||
|
@Metric MutableRate write;
|
||||||
|
final MutableQuantiles[] writeNanosQuantiles;
|
||||||
|
@Metric MutableRate create;
|
||||||
|
@Metric MutableRate mkdir;
|
||||||
|
@Metric MutableRate symlink;
|
||||||
|
@Metric MutableRate mknod;
|
||||||
|
@Metric MutableRate remove;
|
||||||
|
@Metric MutableRate rmdir;
|
||||||
|
@Metric MutableRate rename;
|
||||||
|
@Metric MutableRate link;
|
||||||
|
@Metric MutableRate readdir;
|
||||||
|
@Metric MutableRate readdirplus;
|
||||||
|
@Metric MutableRate fsstat;
|
||||||
|
@Metric MutableRate fsinfo;
|
||||||
|
@Metric MutableRate pathconf;
|
||||||
|
@Metric MutableRate commit;
|
||||||
|
final MutableQuantiles[] commitNanosQuantiles;
|
||||||
|
|
||||||
|
@Metric MutableCounterLong bytesWritten;
|
||||||
|
@Metric MutableCounterLong bytesRead;
|
||||||
|
|
||||||
|
final MetricsRegistry registry = new MetricsRegistry("nfs3");
|
||||||
|
final String name;
|
||||||
|
JvmMetrics jvmMetrics = null;
|
||||||
|
|
||||||
|
public Nfs3Metrics(String name, String sessionId, int[] intervals,
|
||||||
|
final JvmMetrics jvmMetrics) {
|
||||||
|
this.name = name;
|
||||||
|
this.jvmMetrics = jvmMetrics;
|
||||||
|
registry.tag(SessionId, sessionId);
|
||||||
|
|
||||||
|
final int len = intervals.length;
|
||||||
|
readNanosQuantiles = new MutableQuantiles[len];
|
||||||
|
writeNanosQuantiles = new MutableQuantiles[len];
|
||||||
|
commitNanosQuantiles = new MutableQuantiles[len];
|
||||||
|
|
||||||
|
for (int i = 0; i < len; i++) {
|
||||||
|
int interval = intervals[i];
|
||||||
|
readNanosQuantiles[i] = registry.newQuantiles("readProcessNanos"
|
||||||
|
+ interval + "s", "Read process in ns", "ops", "latency", interval);
|
||||||
|
writeNanosQuantiles[i] = registry.newQuantiles("writeProcessNanos"
|
||||||
|
+ interval + "s", " process in ns", "ops", "latency", interval);
|
||||||
|
commitNanosQuantiles[i] = registry.newQuantiles("commitProcessNanos"
|
||||||
|
+ interval + "s", "Read process in ns", "ops", "latency", interval);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Nfs3Metrics create(Configuration conf, String gatewayName) {
|
||||||
|
String sessionId = conf.get(DFSConfigKeys.DFS_METRICS_SESSION_ID_KEY);
|
||||||
|
MetricsSystem ms = DefaultMetricsSystem.instance();
|
||||||
|
JvmMetrics jm = JvmMetrics.create(gatewayName, sessionId, ms);
|
||||||
|
|
||||||
|
// Percentile measurement is [,,,] by default
|
||||||
|
int[] intervals = conf.getInts(conf.get(
|
||||||
|
NfsConfigKeys.NFS_METRICS_PERCENTILES_INTERVALS_KEY,
|
||||||
|
NfsConfigKeys.NFS_METRICS_PERCENTILES_INTERVALS_DEFAULT));
|
||||||
|
return ms.register(new Nfs3Metrics(gatewayName, sessionId, intervals, jm));
|
||||||
|
}
|
||||||
|
|
||||||
|
public String name() {
|
||||||
|
return name;
|
||||||
|
}
|
||||||
|
|
||||||
|
public JvmMetrics getJvmMetrics() {
|
||||||
|
return jvmMetrics;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void incrBytesWritten(long bytes) {
|
||||||
|
bytesWritten.incr(bytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void incrBytesRead(long bytes) {
|
||||||
|
bytesRead.incr(bytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addGetattr(long latencyNanos) {
|
||||||
|
getattr.add(latencyNanos);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addSetattr(long latencyNanos) {
|
||||||
|
setattr.add(latencyNanos);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addLookup(long latencyNanos) {
|
||||||
|
lookup.add(latencyNanos);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addAccess(long latencyNanos) {
|
||||||
|
access.add(latencyNanos);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addReadlink(long latencyNanos) {
|
||||||
|
readlink.add(latencyNanos);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addRead(long latencyNanos) {
|
||||||
|
read.add(latencyNanos);
|
||||||
|
for (MutableQuantiles q : readNanosQuantiles) {
|
||||||
|
q.add(latencyNanos);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addWrite(long latencyNanos) {
|
||||||
|
write.add(latencyNanos);
|
||||||
|
for (MutableQuantiles q : writeNanosQuantiles) {
|
||||||
|
q.add(latencyNanos);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addCreate(long latencyNanos) {
|
||||||
|
create.add(latencyNanos);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addMkdir(long latencyNanos) {
|
||||||
|
mkdir.add(latencyNanos);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addSymlink(long latencyNanos) {
|
||||||
|
symlink.add(latencyNanos);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addMknod(long latencyNanos) {
|
||||||
|
mknod.add(latencyNanos);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addRemove(long latencyNanos) {
|
||||||
|
remove.add(latencyNanos);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addRmdir(long latencyNanos) {
|
||||||
|
rmdir.add(latencyNanos);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addRename(long latencyNanos) {
|
||||||
|
rename.add(latencyNanos);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addLink(long latencyNanos) {
|
||||||
|
link.add(latencyNanos);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addReaddir(long latencyNanos) {
|
||||||
|
readdir.add(latencyNanos);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addReaddirplus(long latencyNanos) {
|
||||||
|
readdirplus.add(latencyNanos);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addFsstat(long latencyNanos) {
|
||||||
|
fsstat.add(latencyNanos);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addFsinfo(long latencyNanos) {
|
||||||
|
fsinfo.add(latencyNanos);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addPathconf(long latencyNanos) {
|
||||||
|
pathconf.add(latencyNanos);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addCommit(long latencyNanos) {
|
||||||
|
commit.add(latencyNanos);
|
||||||
|
for (MutableQuantiles q : commitNanosQuantiles) {
|
||||||
|
q.add(latencyNanos);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -213,4 +213,8 @@ public static byte[] longToByte(long v) {
|
|||||||
data[7] = (byte) (v >>> 0);
|
data[7] = (byte) (v >>> 0);
|
||||||
return data;
|
return data;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static long getElapsedTime(long startTimeNano) {
|
||||||
|
return System.nanoTime() - startTimeNano;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -129,9 +129,8 @@ static class CommitCtx {
|
|||||||
private final Channel channel;
|
private final Channel channel;
|
||||||
private final int xid;
|
private final int xid;
|
||||||
private final Nfs3FileAttributes preOpAttr;
|
private final Nfs3FileAttributes preOpAttr;
|
||||||
|
|
||||||
// Remember time for debug purpose
|
public final long startTime;
|
||||||
private final long startTime;
|
|
||||||
|
|
||||||
long getOffset() {
|
long getOffset() {
|
||||||
return offset;
|
return offset;
|
||||||
@ -159,7 +158,7 @@ long getStartTime() {
|
|||||||
this.channel = channel;
|
this.channel = channel;
|
||||||
this.xid = xid;
|
this.xid = xid;
|
||||||
this.preOpAttr = preOpAttr;
|
this.preOpAttr = preOpAttr;
|
||||||
this.startTime = Time.monotonicNow();
|
this.startTime = System.nanoTime();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -687,6 +686,8 @@ private void receivedNewWriteInternal(DFSClient dfsClient,
|
|||||||
WccData fileWcc = new WccData(preOpAttr, latestAttr);
|
WccData fileWcc = new WccData(preOpAttr, latestAttr);
|
||||||
WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
|
WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
|
||||||
fileWcc, count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
|
fileWcc, count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
|
||||||
|
RpcProgramNfs3.metrics.addWrite(Nfs3Utils
|
||||||
|
.getElapsedTime(writeCtx.startTime));
|
||||||
Nfs3Utils
|
Nfs3Utils
|
||||||
.writeChannel(channel, response.serialize(new XDR(),
|
.writeChannel(channel, response.serialize(new XDR(),
|
||||||
xid, new VerifierNone()), xid);
|
xid, new VerifierNone()), xid);
|
||||||
@ -1131,14 +1132,16 @@ private void processCommits(long offset) {
|
|||||||
|
|
||||||
COMMIT3Response response = new COMMIT3Response(status, wccData,
|
COMMIT3Response response = new COMMIT3Response(status, wccData,
|
||||||
Nfs3Constant.WRITE_COMMIT_VERF);
|
Nfs3Constant.WRITE_COMMIT_VERF);
|
||||||
|
RpcProgramNfs3.metrics.addCommit(Nfs3Utils
|
||||||
|
.getElapsedTime(commit.startTime));
|
||||||
Nfs3Utils.writeChannelCommit(commit.getChannel(), response
|
Nfs3Utils.writeChannelCommit(commit.getChannel(), response
|
||||||
.serialize(new XDR(), commit.getXid(),
|
.serialize(new XDR(), commit.getXid(),
|
||||||
new VerifierNone()), commit.getXid());
|
new VerifierNone()), commit.getXid());
|
||||||
|
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("FileId: " + latestAttr.getFileId() + " Service time:"
|
LOG.debug("FileId: " + latestAttr.getFileId() + " Service time:"
|
||||||
+ (Time.monotonicNow() - commit.getStartTime())
|
+ Nfs3Utils.getElapsedTime(commit.startTime)
|
||||||
+ "ms. Sent response for commit:" + commit);
|
+ "ns. Sent response for commit:" + commit);
|
||||||
}
|
}
|
||||||
entry = pendingCommits.firstEntry();
|
entry = pendingCommits.firstEntry();
|
||||||
}
|
}
|
||||||
@ -1162,6 +1165,7 @@ private void doSingleWrite(final WriteCtx writeCtx) {
|
|||||||
// The write is not protected by lock. asyncState is used to make sure
|
// The write is not protected by lock. asyncState is used to make sure
|
||||||
// there is one thread doing write back at any time
|
// there is one thread doing write back at any time
|
||||||
writeCtx.writeData(fos);
|
writeCtx.writeData(fos);
|
||||||
|
RpcProgramNfs3.metrics.incrBytesWritten(writeCtx.getCount());
|
||||||
|
|
||||||
long flushedOffset = getFlushedOffset();
|
long flushedOffset = getFlushedOffset();
|
||||||
if (flushedOffset != (offset + count)) {
|
if (flushedOffset != (offset + count)) {
|
||||||
@ -1213,6 +1217,7 @@ private void doSingleWrite(final WriteCtx writeCtx) {
|
|||||||
}
|
}
|
||||||
WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
|
WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
|
||||||
fileWcc, count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
|
fileWcc, count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
|
||||||
|
RpcProgramNfs3.metrics.addWrite(Nfs3Utils.getElapsedTime(writeCtx.startTime));
|
||||||
Nfs3Utils.writeChannel(channel, response.serialize(
|
Nfs3Utils.writeChannel(channel, response.serialize(
|
||||||
new XDR(), xid, new VerifierNone()), xid);
|
new XDR(), xid, new VerifierNone()), xid);
|
||||||
}
|
}
|
||||||
|
@ -48,6 +48,8 @@
|
|||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
|
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||||
|
import org.apache.hadoop.net.DNS;
|
||||||
import org.apache.hadoop.nfs.AccessPrivilege;
|
import org.apache.hadoop.nfs.AccessPrivilege;
|
||||||
import org.apache.hadoop.nfs.NfsExports;
|
import org.apache.hadoop.nfs.NfsExports;
|
||||||
import org.apache.hadoop.nfs.NfsFileType;
|
import org.apache.hadoop.nfs.NfsFileType;
|
||||||
@ -164,6 +166,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
|
|||||||
private final RpcCallCache rpcCallCache;
|
private final RpcCallCache rpcCallCache;
|
||||||
private JvmPauseMonitor pauseMonitor;
|
private JvmPauseMonitor pauseMonitor;
|
||||||
private Nfs3HttpServer infoServer = null;
|
private Nfs3HttpServer infoServer = null;
|
||||||
|
static Nfs3Metrics metrics;
|
||||||
|
|
||||||
public RpcProgramNfs3(NfsConfiguration config, DatagramSocket registrationSocket,
|
public RpcProgramNfs3(NfsConfiguration config, DatagramSocket registrationSocket,
|
||||||
boolean allowInsecurePorts) throws IOException {
|
boolean allowInsecurePorts) throws IOException {
|
||||||
@ -209,6 +212,17 @@ public RpcProgramNfs3(NfsConfiguration config, DatagramSocket registrationSocket
|
|||||||
infoServer = new Nfs3HttpServer(config);
|
infoServer = new Nfs3HttpServer(config);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static RpcProgramNfs3 createRpcProgramNfs3(NfsConfiguration config,
|
||||||
|
DatagramSocket registrationSocket, boolean allowInsecurePorts)
|
||||||
|
throws IOException {
|
||||||
|
DefaultMetricsSystem.initialize("Nfs3");
|
||||||
|
String displayName = DNS.getDefaultHost("default", "default")
|
||||||
|
+ config.getInt(NfsConfigKeys.DFS_NFS_SERVER_PORT_KEY,
|
||||||
|
NfsConfigKeys.DFS_NFS_SERVER_PORT_DEFAULT);
|
||||||
|
metrics = Nfs3Metrics.create(config, displayName);
|
||||||
|
return new RpcProgramNfs3(config, registrationSocket, allowInsecurePorts);
|
||||||
|
}
|
||||||
|
|
||||||
private void clearDirectory(String writeDumpDir) throws IOException {
|
private void clearDirectory(String writeDumpDir) throws IOException {
|
||||||
File dumpDir = new File(writeDumpDir);
|
File dumpDir = new File(writeDumpDir);
|
||||||
if (dumpDir.exists()) {
|
if (dumpDir.exists()) {
|
||||||
@ -225,10 +239,11 @@ private void clearDirectory(String writeDumpDir) throws IOException {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void startDaemons() {
|
public void startDaemons() {
|
||||||
if (pauseMonitor == null) {
|
if (pauseMonitor == null) {
|
||||||
pauseMonitor = new JvmPauseMonitor(config);
|
pauseMonitor = new JvmPauseMonitor(config);
|
||||||
pauseMonitor.start();
|
pauseMonitor.start();
|
||||||
|
metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);
|
||||||
}
|
}
|
||||||
writeManager.startAsyncDataSerivce();
|
writeManager.startAsyncDataSerivce();
|
||||||
try {
|
try {
|
||||||
@ -770,6 +785,7 @@ READ3Response read(XDR xdr, SecurityHandler securityHandler,
|
|||||||
|
|
||||||
try {
|
try {
|
||||||
readCount = fis.read(offset, readbuffer, 0, count);
|
readCount = fis.read(offset, readbuffer, 0, count);
|
||||||
|
metrics.incrBytesRead(readCount);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
// TODO: A cleaner way is to throw a new type of exception
|
// TODO: A cleaner way is to throw a new type of exception
|
||||||
// which requires incompatible changes.
|
// which requires incompatible changes.
|
||||||
@ -2049,8 +2065,8 @@ COMMIT3Response commit(XDR xdr, Channel channel, int xid,
|
|||||||
: (request.getOffset() + request.getCount());
|
: (request.getOffset() + request.getCount());
|
||||||
|
|
||||||
// Insert commit as an async request
|
// Insert commit as an async request
|
||||||
writeManager.handleCommit(dfsClient, handle, commitOffset,
|
writeManager.handleCommit(dfsClient, handle, commitOffset, channel, xid,
|
||||||
channel, xid, preOpAttr);
|
preOpAttr);
|
||||||
return null;
|
return null;
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.warn("Exception ", e);
|
LOG.warn("Exception ", e);
|
||||||
@ -2132,20 +2148,29 @@ public void handleInternal(ChannelHandlerContext ctx, RpcInfo info) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Since write and commit could be async, they use their own startTime and
|
||||||
|
// only record success requests.
|
||||||
|
final long startTime = System.nanoTime();
|
||||||
|
|
||||||
NFS3Response response = null;
|
NFS3Response response = null;
|
||||||
if (nfsproc3 == NFSPROC3.NULL) {
|
if (nfsproc3 == NFSPROC3.NULL) {
|
||||||
response = nullProcedure();
|
response = nullProcedure();
|
||||||
} else if (nfsproc3 == NFSPROC3.GETATTR) {
|
} else if (nfsproc3 == NFSPROC3.GETATTR) {
|
||||||
response = getattr(xdr, info);
|
response = getattr(xdr, info);
|
||||||
|
metrics.addGetattr(Nfs3Utils.getElapsedTime(startTime));
|
||||||
} else if (nfsproc3 == NFSPROC3.SETATTR) {
|
} else if (nfsproc3 == NFSPROC3.SETATTR) {
|
||||||
response = setattr(xdr, info);
|
response = setattr(xdr, info);
|
||||||
|
metrics.addSetattr(Nfs3Utils.getElapsedTime(startTime));
|
||||||
} else if (nfsproc3 == NFSPROC3.LOOKUP) {
|
} else if (nfsproc3 == NFSPROC3.LOOKUP) {
|
||||||
response = lookup(xdr, info);
|
response = lookup(xdr, info);
|
||||||
|
metrics.addLookup(Nfs3Utils.getElapsedTime(startTime));
|
||||||
} else if (nfsproc3 == NFSPROC3.ACCESS) {
|
} else if (nfsproc3 == NFSPROC3.ACCESS) {
|
||||||
response = access(xdr, info);
|
response = access(xdr, info);
|
||||||
|
metrics.addAccess(Nfs3Utils.getElapsedTime(startTime));
|
||||||
} else if (nfsproc3 == NFSPROC3.READLINK) {
|
} else if (nfsproc3 == NFSPROC3.READLINK) {
|
||||||
response = readlink(xdr, info);
|
response = readlink(xdr, info);
|
||||||
|
metrics.addReadlink(Nfs3Utils.getElapsedTime(startTime));
|
||||||
} else if (nfsproc3 == NFSPROC3.READ) {
|
} else if (nfsproc3 == NFSPROC3.READ) {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug(Nfs3Utils.READ_RPC_START + xid);
|
LOG.debug(Nfs3Utils.READ_RPC_START + xid);
|
||||||
@ -2154,6 +2179,7 @@ public void handleInternal(ChannelHandlerContext ctx, RpcInfo info) {
|
|||||||
if (LOG.isDebugEnabled() && (nfsproc3 == NFSPROC3.READ)) {
|
if (LOG.isDebugEnabled() && (nfsproc3 == NFSPROC3.READ)) {
|
||||||
LOG.debug(Nfs3Utils.READ_RPC_END + xid);
|
LOG.debug(Nfs3Utils.READ_RPC_END + xid);
|
||||||
}
|
}
|
||||||
|
metrics.addRead(Nfs3Utils.getElapsedTime(startTime));
|
||||||
} else if (nfsproc3 == NFSPROC3.WRITE) {
|
} else if (nfsproc3 == NFSPROC3.WRITE) {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug(Nfs3Utils.WRITE_RPC_START + xid);
|
LOG.debug(Nfs3Utils.WRITE_RPC_START + xid);
|
||||||
@ -2162,30 +2188,43 @@ public void handleInternal(ChannelHandlerContext ctx, RpcInfo info) {
|
|||||||
// Write end debug trace is in Nfs3Utils.writeChannel
|
// Write end debug trace is in Nfs3Utils.writeChannel
|
||||||
} else if (nfsproc3 == NFSPROC3.CREATE) {
|
} else if (nfsproc3 == NFSPROC3.CREATE) {
|
||||||
response = create(xdr, info);
|
response = create(xdr, info);
|
||||||
|
metrics.addCreate(Nfs3Utils.getElapsedTime(startTime));
|
||||||
} else if (nfsproc3 == NFSPROC3.MKDIR) {
|
} else if (nfsproc3 == NFSPROC3.MKDIR) {
|
||||||
response = mkdir(xdr, info);
|
response = mkdir(xdr, info);
|
||||||
|
metrics.addMkdir(Nfs3Utils.getElapsedTime(startTime));
|
||||||
} else if (nfsproc3 == NFSPROC3.SYMLINK) {
|
} else if (nfsproc3 == NFSPROC3.SYMLINK) {
|
||||||
response = symlink(xdr, info);
|
response = symlink(xdr, info);
|
||||||
|
metrics.addSymlink(Nfs3Utils.getElapsedTime(startTime));
|
||||||
} else if (nfsproc3 == NFSPROC3.MKNOD) {
|
} else if (nfsproc3 == NFSPROC3.MKNOD) {
|
||||||
response = mknod(xdr, info);
|
response = mknod(xdr, info);
|
||||||
|
metrics.addMknod(Nfs3Utils.getElapsedTime(startTime));
|
||||||
} else if (nfsproc3 == NFSPROC3.REMOVE) {
|
} else if (nfsproc3 == NFSPROC3.REMOVE) {
|
||||||
response = remove(xdr, info);
|
response = remove(xdr, info);
|
||||||
|
metrics.addRemove(Nfs3Utils.getElapsedTime(startTime));
|
||||||
} else if (nfsproc3 == NFSPROC3.RMDIR) {
|
} else if (nfsproc3 == NFSPROC3.RMDIR) {
|
||||||
response = rmdir(xdr, info);
|
response = rmdir(xdr, info);
|
||||||
|
metrics.addRmdir(Nfs3Utils.getElapsedTime(startTime));
|
||||||
} else if (nfsproc3 == NFSPROC3.RENAME) {
|
} else if (nfsproc3 == NFSPROC3.RENAME) {
|
||||||
response = rename(xdr, info);
|
response = rename(xdr, info);
|
||||||
|
metrics.addRename(Nfs3Utils.getElapsedTime(startTime));
|
||||||
} else if (nfsproc3 == NFSPROC3.LINK) {
|
} else if (nfsproc3 == NFSPROC3.LINK) {
|
||||||
response = link(xdr, info);
|
response = link(xdr, info);
|
||||||
|
metrics.addLink(Nfs3Utils.getElapsedTime(startTime));
|
||||||
} else if (nfsproc3 == NFSPROC3.READDIR) {
|
} else if (nfsproc3 == NFSPROC3.READDIR) {
|
||||||
response = readdir(xdr, info);
|
response = readdir(xdr, info);
|
||||||
|
metrics.addReaddir(Nfs3Utils.getElapsedTime(startTime));
|
||||||
} else if (nfsproc3 == NFSPROC3.READDIRPLUS) {
|
} else if (nfsproc3 == NFSPROC3.READDIRPLUS) {
|
||||||
response = readdirplus(xdr, info);
|
response = readdirplus(xdr, info);
|
||||||
|
metrics.addReaddirplus(Nfs3Utils.getElapsedTime(startTime));
|
||||||
} else if (nfsproc3 == NFSPROC3.FSSTAT) {
|
} else if (nfsproc3 == NFSPROC3.FSSTAT) {
|
||||||
response = fsstat(xdr, info);
|
response = fsstat(xdr, info);
|
||||||
|
metrics.addFsstat(Nfs3Utils.getElapsedTime(startTime));
|
||||||
} else if (nfsproc3 == NFSPROC3.FSINFO) {
|
} else if (nfsproc3 == NFSPROC3.FSINFO) {
|
||||||
response = fsinfo(xdr, info);
|
response = fsinfo(xdr, info);
|
||||||
|
metrics.addFsinfo(Nfs3Utils.getElapsedTime(startTime));
|
||||||
} else if (nfsproc3 == NFSPROC3.PATHCONF) {
|
} else if (nfsproc3 == NFSPROC3.PATHCONF) {
|
||||||
response = pathconf(xdr,info);
|
response = pathconf(xdr, info);
|
||||||
|
metrics.addPathconf(Nfs3Utils.getElapsedTime(startTime));
|
||||||
} else if (nfsproc3 == NFSPROC3.COMMIT) {
|
} else if (nfsproc3 == NFSPROC3.COMMIT) {
|
||||||
response = commit(xdr, info);
|
response = commit(xdr, info);
|
||||||
} else {
|
} else {
|
||||||
|
@ -84,7 +84,8 @@ public int getOriginalCount() {
|
|||||||
private long dumpFileOffset;
|
private long dumpFileOffset;
|
||||||
|
|
||||||
private volatile DataState dataState;
|
private volatile DataState dataState;
|
||||||
|
public final long startTime;
|
||||||
|
|
||||||
public DataState getDataState() {
|
public DataState getDataState() {
|
||||||
return dataState;
|
return dataState;
|
||||||
}
|
}
|
||||||
@ -235,6 +236,7 @@ void setReplied(boolean replied) {
|
|||||||
this.replied = replied;
|
this.replied = replied;
|
||||||
this.dataState = dataState;
|
this.dataState = dataState;
|
||||||
raf = null;
|
raf = null;
|
||||||
|
this.startTime = System.nanoTime();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -224,6 +224,7 @@ int commitBeforeRead(DFSClient dfsClient, FileHandle fileHandle,
|
|||||||
status = Nfs3Status.NFS3_OK;
|
status = Nfs3Status.NFS3_OK;
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
|
// commit request triggered by read won't create pending comment obj
|
||||||
COMMIT_STATUS ret = openFileCtx.checkCommit(dfsClient, commitOffset,
|
COMMIT_STATUS ret = openFileCtx.checkCommit(dfsClient, commitOffset,
|
||||||
null, 0, null, true);
|
null, 0, null, true);
|
||||||
switch (ret) {
|
switch (ret) {
|
||||||
@ -260,6 +261,7 @@ int commitBeforeRead(DFSClient dfsClient, FileHandle fileHandle,
|
|||||||
|
|
||||||
void handleCommit(DFSClient dfsClient, FileHandle fileHandle,
|
void handleCommit(DFSClient dfsClient, FileHandle fileHandle,
|
||||||
long commitOffset, Channel channel, int xid, Nfs3FileAttributes preOpAttr) {
|
long commitOffset, Channel channel, int xid, Nfs3FileAttributes preOpAttr) {
|
||||||
|
long startTime = System.nanoTime();
|
||||||
int status;
|
int status;
|
||||||
OpenFileCtx openFileCtx = fileContextCache.get(fileHandle);
|
OpenFileCtx openFileCtx = fileContextCache.get(fileHandle);
|
||||||
|
|
||||||
@ -306,9 +308,9 @@ void handleCommit(DFSClient dfsClient, FileHandle fileHandle,
|
|||||||
WccData fileWcc = new WccData(Nfs3Utils.getWccAttr(preOpAttr), postOpAttr);
|
WccData fileWcc = new WccData(Nfs3Utils.getWccAttr(preOpAttr), postOpAttr);
|
||||||
COMMIT3Response response = new COMMIT3Response(status, fileWcc,
|
COMMIT3Response response = new COMMIT3Response(status, fileWcc,
|
||||||
Nfs3Constant.WRITE_COMMIT_VERF);
|
Nfs3Constant.WRITE_COMMIT_VERF);
|
||||||
|
RpcProgramNfs3.metrics.addCommit(Nfs3Utils.getElapsedTime(startTime));
|
||||||
Nfs3Utils.writeChannelCommit(channel,
|
Nfs3Utils.writeChannelCommit(channel,
|
||||||
response.serialize(new XDR(), xid, new VerifierNone()),
|
response.serialize(new XDR(), xid, new VerifierNone()), xid);
|
||||||
xid);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -48,6 +48,10 @@ public static void setUp() throws Exception {
|
|||||||
HttpConfig.Policy.HTTP_AND_HTTPS.name());
|
HttpConfig.Policy.HTTP_AND_HTTPS.name());
|
||||||
conf.set(NfsConfigKeys.NFS_HTTP_ADDRESS_KEY, "localhost:0");
|
conf.set(NfsConfigKeys.NFS_HTTP_ADDRESS_KEY, "localhost:0");
|
||||||
conf.set(NfsConfigKeys.NFS_HTTPS_ADDRESS_KEY, "localhost:0");
|
conf.set(NfsConfigKeys.NFS_HTTPS_ADDRESS_KEY, "localhost:0");
|
||||||
|
// Use emphral port in case tests are running in parallel
|
||||||
|
conf.setInt(NfsConfigKeys.DFS_NFS_SERVER_PORT_KEY, 0);
|
||||||
|
conf.setInt(NfsConfigKeys.DFS_NFS_MOUNTD_PORT_KEY, 0);
|
||||||
|
|
||||||
File base = new File(BASEDIR);
|
File base = new File(BASEDIR);
|
||||||
FileUtil.fullyDelete(base);
|
FileUtil.fullyDelete(base);
|
||||||
base.mkdirs();
|
base.mkdirs();
|
||||||
|
@ -273,6 +273,8 @@ Release 2.7.0 - UNRELEASED
|
|||||||
(Maysam Yabandeh via wang)
|
(Maysam Yabandeh via wang)
|
||||||
|
|
||||||
HDFS-7424. Add web UI for NFS gateway (brandonli)
|
HDFS-7424. Add web UI for NFS gateway (brandonli)
|
||||||
|
|
||||||
|
HDFS-7449. Add metrics to NFS gateway (brandonli)
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user