HDFS-5364. Add OpenFileCtx cache. Contributed by Brandon Li

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1539834 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Brandon Li 2013-11-07 21:49:13 +00:00
parent 77afc605fd
commit 3fccdec6e0
15 changed files with 582 additions and 139 deletions

View File

@ -52,6 +52,7 @@ public MountdBase(List<String> exports, RpcProgram program) throws IOException {
private void startUDPServer() { private void startUDPServer() {
SimpleUdpServer udpServer = new SimpleUdpServer(rpcProgram.getPort(), SimpleUdpServer udpServer = new SimpleUdpServer(rpcProgram.getPort(),
rpcProgram, 1); rpcProgram, 1);
rpcProgram.startDaemons();
udpServer.run(); udpServer.run();
} }
@ -59,6 +60,7 @@ private void startUDPServer() {
private void startTCPServer() { private void startTCPServer() {
SimpleTcpServer tcpServer = new SimpleTcpServer(rpcProgram.getPort(), SimpleTcpServer tcpServer = new SimpleTcpServer(rpcProgram.getPort(),
rpcProgram, 1); rpcProgram, 1);
rpcProgram.startDaemons();
tcpServer.run(); tcpServer.run();
} }

View File

@ -20,7 +20,6 @@
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mount.MountdBase;
import org.apache.hadoop.oncrpc.RpcProgram; import org.apache.hadoop.oncrpc.RpcProgram;
import org.apache.hadoop.oncrpc.SimpleTcpServer; import org.apache.hadoop.oncrpc.SimpleTcpServer;
import org.apache.hadoop.portmap.PortmapMapping; import org.apache.hadoop.portmap.PortmapMapping;
@ -32,34 +31,27 @@
*/ */
public abstract class Nfs3Base { public abstract class Nfs3Base {
public static final Log LOG = LogFactory.getLog(Nfs3Base.class); public static final Log LOG = LogFactory.getLog(Nfs3Base.class);
private final MountdBase mountd;
private final RpcProgram rpcProgram; private final RpcProgram rpcProgram;
private final int nfsPort; private final int nfsPort;
public MountdBase getMountBase() {
return mountd;
}
public RpcProgram getRpcProgram() { public RpcProgram getRpcProgram() {
return rpcProgram; return rpcProgram;
} }
protected Nfs3Base(MountdBase mountd, RpcProgram program, Configuration conf) { protected Nfs3Base(RpcProgram rpcProgram, Configuration conf) {
this.mountd = mountd; this.rpcProgram = rpcProgram;
this.rpcProgram = program;
this.nfsPort = conf.getInt("nfs3.server.port", Nfs3Constant.PORT); this.nfsPort = conf.getInt("nfs3.server.port", Nfs3Constant.PORT);
LOG.info("NFS server port set to: "+nfsPort); LOG.info("NFS server port set to: " + nfsPort);
} }
protected Nfs3Base(MountdBase mountd, RpcProgram program) { protected Nfs3Base(RpcProgram rpcProgram) {
this.mountd = mountd; this.rpcProgram = rpcProgram;
this.rpcProgram = program;
this.nfsPort = Nfs3Constant.PORT; this.nfsPort = Nfs3Constant.PORT;
} }
public void start(boolean register) { public void start(boolean register) {
mountd.start(register); // Start mountd
startTCPServer(); // Start TCP server startTCPServer(); // Start TCP server
if (register) { if (register) {
rpcProgram.register(PortmapMapping.TRANSPORT_TCP); rpcProgram.register(PortmapMapping.TRANSPORT_TCP);
} }
@ -68,6 +60,7 @@ public void start(boolean register) {
private void startTCPServer() { private void startTCPServer() {
SimpleTcpServer tcpServer = new SimpleTcpServer(nfsPort, SimpleTcpServer tcpServer = new SimpleTcpServer(nfsPort,
rpcProgram, 0); rpcProgram, 0);
rpcProgram.startDaemons();
tcpServer.run(); tcpServer.run();
} }
} }

View File

@ -205,6 +205,11 @@ public static WriteStableHow fromValue(int id) {
public static final String FILE_DUMP_DIR_DEFAULT = "/tmp/.hdfs-nfs"; public static final String FILE_DUMP_DIR_DEFAULT = "/tmp/.hdfs-nfs";
public static final String ENABLE_FILE_DUMP_KEY = "dfs.nfs3.enableDump"; public static final String ENABLE_FILE_DUMP_KEY = "dfs.nfs3.enableDump";
public static final boolean ENABLE_FILE_DUMP_DEFAULT = true; public static final boolean ENABLE_FILE_DUMP_DEFAULT = true;
public static final String MAX_OPEN_FILES = "dfs.nfs3.max.open.files";
public static final int MAX_OPEN_FILES_DEFAULT = 256;
public static final String OUTPUT_STREAM_TIMEOUT = "dfs.nfs3.stream.timeout";
public static final long OUTPUT_STREAM_TIMEOUT_DEFAULT = 10 * 60 * 1000; // 10 minutes
public static final long OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT = 10 * 1000; //10 seconds
public final static String UNKNOWN_USER = "nobody"; public final static String UNKNOWN_USER = "nobody";
public final static String UNKNOWN_GROUP = "nobody"; public final static String UNKNOWN_GROUP = "nobody";

View File

@ -83,4 +83,10 @@ public void serialize(XDR xdr) {
xdr.writeInt(count); xdr.writeInt(count);
xdr.writeFixedOpaque(data.array(), count); xdr.writeFixedOpaque(data.array(), count);
} }
@Override
public String toString() {
return String.format("fileId: %d offset: %d count: %d stableHow: %s",
handle.getFileId(), offset, count, stableHow.name());
}
} }

View File

@ -100,6 +100,9 @@ protected void register(PortmapMapping mapEntry) {
} }
} }
// Start extra daemons
public void startDaemons() {}
@Override @Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
throws Exception { throws Exception {

View File

@ -23,33 +23,47 @@
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.nfs.mount.Mountd; import org.apache.hadoop.hdfs.nfs.mount.Mountd;
import org.apache.hadoop.mount.MountdBase;
import org.apache.hadoop.nfs.nfs3.Nfs3Base; import org.apache.hadoop.nfs.nfs3.Nfs3Base;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import com.google.common.annotations.VisibleForTesting;
/** /**
* Nfs server. Supports NFS v3 using {@link RpcProgramNfs3}. * Nfs server. Supports NFS v3 using {@link RpcProgramNfs3}.
* Currently Mountd program is also started inside this class. * Currently Mountd program is also started inside this class.
* Only TCP server is supported and UDP is not supported. * Only TCP server is supported and UDP is not supported.
*/ */
public class Nfs3 extends Nfs3Base { public class Nfs3 extends Nfs3Base {
private Mountd mountd;
static { static {
Configuration.addDefaultResource("hdfs-default.xml"); Configuration.addDefaultResource("hdfs-default.xml");
Configuration.addDefaultResource("hdfs-site.xml"); Configuration.addDefaultResource("hdfs-site.xml");
} }
public Nfs3(List<String> exports) throws IOException { public Nfs3(List<String> exports) throws IOException {
super(new Mountd(exports), new RpcProgramNfs3()); super(new RpcProgramNfs3());
mountd = new Mountd(exports);
} }
@VisibleForTesting
public Nfs3(List<String> exports, Configuration config) throws IOException { public Nfs3(List<String> exports, Configuration config) throws IOException {
super(new Mountd(exports, config), new RpcProgramNfs3(config), config); super(new RpcProgramNfs3(config), config);
mountd = new Mountd(exports, config);
} }
public Mountd getMountd() {
return mountd;
}
public static void main(String[] args) throws IOException { public static void main(String[] args) throws IOException {
StringUtils.startupShutdownMessage(Nfs3.class, args, LOG); StringUtils.startupShutdownMessage(Nfs3.class, args, LOG);
List<String> exports = new ArrayList<String>(); List<String> exports = new ArrayList<String>();
exports.add("/"); exports.add("/");
final Nfs3 nfsServer = new Nfs3(exports); final Nfs3 nfsServer = new Nfs3(exports);
nfsServer.mountd.start(true); // Start mountd
nfsServer.start(true); nfsServer.start(true);
} }
} }

View File

@ -24,7 +24,6 @@
import java.io.RandomAccessFile; import java.io.RandomAccessFile;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.security.InvalidParameterException;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map.Entry; import java.util.Map.Entry;
@ -96,7 +95,7 @@ static enum COMMIT_STATUS {
// It's updated after each sync to HDFS // It's updated after each sync to HDFS
private Nfs3FileAttributes latestAttr; private Nfs3FileAttributes latestAttr;
private final ConcurrentNavigableMap<OffsetRange, WriteCtx> pendingWrites; private final ConcurrentNavigableMap<OffsetRange, WriteCtx> pendingWrites;
private final ConcurrentNavigableMap<Long, CommitCtx> pendingCommits; private final ConcurrentNavigableMap<Long, CommitCtx> pendingCommits;
@ -165,10 +164,22 @@ private boolean checkStreamTimeout(long streamTimeout) {
return System.currentTimeMillis() - lastAccessTime > streamTimeout; return System.currentTimeMillis() - lastAccessTime > streamTimeout;
} }
long getLastAccessTime() {
return lastAccessTime;
}
public long getNextOffset() { public long getNextOffset() {
return nextOffset.get(); return nextOffset.get();
} }
boolean getActiveState() {
return this.activeState;
}
boolean hasPendingWork() {
return (pendingWrites.size() != 0 || pendingCommits.size() != 0);
}
// Increase or decrease the memory occupation of non-sequential writes // Increase or decrease the memory occupation of non-sequential writes
private long updateNonSequentialWriteInMemory(long count) { private long updateNonSequentialWriteInMemory(long count) {
long newValue = nonSequentialWriteInMemory.addAndGet(count); long newValue = nonSequentialWriteInMemory.addAndGet(count);
@ -792,19 +803,18 @@ private void addWrite(WriteCtx writeCtx) {
* @return true, remove stream; false, keep stream * @return true, remove stream; false, keep stream
*/ */
public synchronized boolean streamCleanup(long fileId, long streamTimeout) { public synchronized boolean streamCleanup(long fileId, long streamTimeout) {
if (streamTimeout < WriteManager.MINIMIUM_STREAM_TIMEOUT) { Preconditions
throw new InvalidParameterException("StreamTimeout" + streamTimeout .checkState(streamTimeout >= Nfs3Constant.OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT);
+ "ms is less than MINIMIUM_STREAM_TIMEOUT " if (!activeState) {
+ WriteManager.MINIMIUM_STREAM_TIMEOUT + "ms"); return true;
} }
boolean flag = false; boolean flag = false;
// Check the stream timeout // Check the stream timeout
if (checkStreamTimeout(streamTimeout)) { if (checkStreamTimeout(streamTimeout)) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("closing stream for fileId:" + fileId); LOG.debug("stream can be closed for fileId:" + fileId);
} }
cleanup();
flag = true; flag = true;
} }
return flag; return flag;
@ -975,7 +985,7 @@ private void doSingleWrite(final WriteCtx writeCtx) {
FileHandle handle = writeCtx.getHandle(); FileHandle handle = writeCtx.getHandle();
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("do write, fileId: " + handle.getFileId() + " offset: " LOG.debug("do write, fileId: " + handle.getFileId() + " offset: "
+ offset + " length:" + count + " stableHow:" + stableHow.getValue()); + offset + " length:" + count + " stableHow:" + stableHow.name());
} }
try { try {
@ -1056,7 +1066,7 @@ private void doSingleWrite(final WriteCtx writeCtx) {
} }
} }
private synchronized void cleanup() { synchronized void cleanup() {
if (!activeState) { if (!activeState) {
LOG.info("Current OpenFileCtx is already inactive, no need to cleanup."); LOG.info("Current OpenFileCtx is already inactive, no need to cleanup.");
return; return;
@ -1064,7 +1074,7 @@ private synchronized void cleanup() {
activeState = false; activeState = false;
// stop the dump thread // stop the dump thread
if (dumpThread != null) { if (dumpThread != null && dumpThread.isAlive()) {
dumpThread.interrupt(); dumpThread.interrupt();
try { try {
dumpThread.join(3000); dumpThread.join(3000);
@ -1146,4 +1156,10 @@ void setNextOffsetForTest(long newValue) {
void setActiveStatusForTest(boolean activeState) { void setActiveStatusForTest(boolean activeState) {
this.activeState = activeState; this.activeState = activeState;
} }
@Override
public String toString() {
return String.format("activeState: %b asyncStatus: %b nextOffset: %d",
activeState, asyncStatus, nextOffset.get());
}
} }

View File

@ -0,0 +1,270 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.nfs.nfs3;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.nfs.nfs3.FileHandle;
import org.apache.hadoop.nfs.nfs3.Nfs3Constant;
import org.apache.hadoop.util.Daemon;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
/**
* A cache saves OpenFileCtx objects for different users. Each cache entry is
* used to maintain the writing context for a single file.
*/
class OpenFileCtxCache {
private static final Log LOG = LogFactory.getLog(OpenFileCtxCache.class);
// Insert and delete with openFileMap are synced
private final ConcurrentMap<FileHandle, OpenFileCtx> openFileMap = Maps
.newConcurrentMap();
private final int maxStreams;
private final long streamTimeout;
private final StreamMonitor streamMonitor;
OpenFileCtxCache(Configuration config, long streamTimeout) {
maxStreams = config.getInt(Nfs3Constant.MAX_OPEN_FILES,
Nfs3Constant.MAX_OPEN_FILES_DEFAULT);
LOG.info("Maximum open streams is " + maxStreams);
this.streamTimeout = streamTimeout;
streamMonitor = new StreamMonitor();
}
/**
* The entry to be evicted is based on the following rules:<br>
* 1. if the OpenFileCtx has any pending task, it will not be chosen.<br>
* 2. if there is inactive OpenFileCtx, the first found one is to evict. <br>
* 3. For OpenFileCtx entries don't belong to group 1 or 2, the idlest one
* is select. If it's idle longer than OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT, it
* will be evicted. Otherwise, the whole eviction request is failed.
*/
@VisibleForTesting
Entry<FileHandle, OpenFileCtx> getEntryToEvict() {
Iterator<Entry<FileHandle, OpenFileCtx>> it = openFileMap.entrySet()
.iterator();
if (LOG.isTraceEnabled()) {
LOG.trace("openFileMap size:" + openFileMap.size());
}
Entry<FileHandle, OpenFileCtx> idlest = null;
while (it.hasNext()) {
Entry<FileHandle, OpenFileCtx> pairs = it.next();
OpenFileCtx ctx = pairs.getValue();
if (!ctx.getActiveState()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Got one inactive stream: " + ctx);
}
return pairs;
}
if (ctx.hasPendingWork()) {
// Always skip files with pending work.
continue;
}
if (idlest == null) {
idlest = pairs;
} else {
if (ctx.getLastAccessTime() < idlest.getValue().getLastAccessTime()) {
idlest = pairs;
}
}
}
if (idlest == null) {
LOG.warn("No eviction candidate. All streams have pending work.");
return null;
} else {
long idleTime = System.currentTimeMillis()
- idlest.getValue().getLastAccessTime();
if (idleTime < Nfs3Constant.OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT) {
if (LOG.isDebugEnabled()) {
LOG.debug("idlest stream's idle time:" + idleTime);
}
LOG.warn("All opened streams are busy, can't remove any from cache.");
return null;
} else {
return idlest;
}
}
}
boolean put(FileHandle h, OpenFileCtx context) {
OpenFileCtx toEvict = null;
synchronized (this) {
Preconditions.checkState(openFileMap.size() <= this.maxStreams,
"stream cache size " + openFileMap.size()
+ " is larger than maximum" + this.maxStreams);
if (openFileMap.size() == this.maxStreams) {
Entry<FileHandle, OpenFileCtx> pairs = getEntryToEvict();
if (pairs ==null) {
return false;
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Evict stream ctx: " + pairs.getValue());
}
toEvict = openFileMap.remove(pairs.getKey());
Preconditions.checkState(toEvict == pairs.getValue(),
"The deleted entry is not the same as odlest found.");
}
}
openFileMap.put(h, context);
}
// Cleanup the old stream outside the lock
if (toEvict != null) {
toEvict.cleanup();
}
return true;
}
@VisibleForTesting
void scan(long streamTimeout) {
ArrayList<OpenFileCtx> ctxToRemove = new ArrayList<OpenFileCtx>();
Iterator<Entry<FileHandle, OpenFileCtx>> it = openFileMap.entrySet()
.iterator();
if (LOG.isTraceEnabled()) {
LOG.trace("openFileMap size:" + openFileMap.size());
}
while (it.hasNext()) {
Entry<FileHandle, OpenFileCtx> pairs = it.next();
FileHandle handle = pairs.getKey();
OpenFileCtx ctx = pairs.getValue();
if (!ctx.streamCleanup(handle.getFileId(), streamTimeout)) {
continue;
}
// Check it again inside lock before removing
synchronized (this) {
OpenFileCtx ctx2 = openFileMap.get(handle);
if (ctx2 != null) {
if (ctx2.streamCleanup(handle.getFileId(), streamTimeout)) {
openFileMap.remove(handle);
if (LOG.isDebugEnabled()) {
LOG.debug("After remove stream " + handle.getFileId()
+ ", the stream number:" + openFileMap.size());
}
ctxToRemove.add(ctx2);
}
}
}
}
// Invoke the cleanup outside the lock
for (OpenFileCtx ofc : ctxToRemove) {
ofc.cleanup();
}
}
OpenFileCtx get(FileHandle key) {
return openFileMap.get(key);
}
int size() {
return openFileMap.size();
}
void start() {
streamMonitor.start();
}
// Evict all entries
void cleanAll() {
ArrayList<OpenFileCtx> cleanedContext = new ArrayList<OpenFileCtx>();
synchronized (this) {
Iterator<Entry<FileHandle, OpenFileCtx>> it = openFileMap.entrySet()
.iterator();
if (LOG.isTraceEnabled()) {
LOG.trace("openFileMap size:" + openFileMap.size());
}
while (it.hasNext()) {
Entry<FileHandle, OpenFileCtx> pairs = it.next();
OpenFileCtx ctx = pairs.getValue();
it.remove();
cleanedContext.add(ctx);
}
}
// Invoke the cleanup outside the lock
for (OpenFileCtx ofc : cleanedContext) {
ofc.cleanup();
}
}
void shutdown() {
// stop the dump thread
if (streamMonitor != null && streamMonitor.isAlive()) {
streamMonitor.shouldRun(false);
streamMonitor.interrupt();
try {
streamMonitor.join(3000);
} catch (InterruptedException e) {
}
}
cleanAll();
}
/**
* StreamMonitor wakes up periodically to find and closes idle streams.
*/
class StreamMonitor extends Daemon {
private final static int rotation = 5 * 1000; // 5 seconds
private long lastWakeupTime = 0;
private boolean shouldRun = true;
void shouldRun(boolean shouldRun) {
this.shouldRun = shouldRun;
}
@Override
public void run() {
while (shouldRun) {
scan(streamTimeout);
// Check if it can sleep
try {
long workedTime = System.currentTimeMillis() - lastWakeupTime;
if (workedTime < rotation) {
if (LOG.isTraceEnabled()) {
LOG.trace("StreamMonitor can still have a sleep:"
+ ((rotation - workedTime) / 1000));
}
Thread.sleep(rotation - workedTime);
}
lastWakeupTime = System.currentTimeMillis();
} catch (InterruptedException e) {
LOG.info("StreamMonitor got interrupted");
return;
}
}
}
}
}

View File

@ -214,6 +214,11 @@ private void clearDirectory(String writeDumpDir) throws IOException {
} }
} }
@Override
public void startDaemons() {
writeManager.startAsyncDataSerivce();
}
/****************************************************** /******************************************************
* RPC call handlers * RPC call handlers
******************************************************/ ******************************************************/
@ -778,7 +783,8 @@ public CREATE3Response create(XDR xdr, SecurityHandler securityHandler,
int createMode = request.getMode(); int createMode = request.getMode();
if ((createMode != Nfs3Constant.CREATE_EXCLUSIVE) if ((createMode != Nfs3Constant.CREATE_EXCLUSIVE)
&& request.getObjAttr().getUpdateFields().contains(SetAttrField.SIZE)) { && request.getObjAttr().getUpdateFields().contains(SetAttrField.SIZE)
&& request.getObjAttr().getSize() != 0) {
LOG.error("Setting file size is not supported when creating file: " LOG.error("Setting file size is not supported when creating file: "
+ fileName + " dir fileId:" + dirHandle.getFileId()); + fileName + " dir fileId:" + dirHandle.getFileId());
return new CREATE3Response(Nfs3Status.NFS3ERR_INVAL); return new CREATE3Response(Nfs3Status.NFS3ERR_INVAL);
@ -831,6 +837,23 @@ preOpDirAttr, new WccData(Nfs3Utils.getWccAttr(preOpDirAttr),
postOpObjAttr = Nfs3Utils.getFileAttr(dfsClient, fileIdPath, iug); postOpObjAttr = Nfs3Utils.getFileAttr(dfsClient, fileIdPath, iug);
dirWcc = Nfs3Utils.createWccData(Nfs3Utils.getWccAttr(preOpDirAttr), dirWcc = Nfs3Utils.createWccData(Nfs3Utils.getWccAttr(preOpDirAttr),
dfsClient, dirFileIdPath, iug); dfsClient, dirFileIdPath, iug);
// Add open stream
OpenFileCtx openFileCtx = new OpenFileCtx(fos, postOpObjAttr,
writeDumpDir + "/" + postOpObjAttr.getFileId(), dfsClient, iug);
fileHandle = new FileHandle(postOpObjAttr.getFileId());
if (!writeManager.addOpenFileStream(fileHandle, openFileCtx)) {
LOG.warn("Can't add more stream, close it."
+ " Future write will become append");
fos.close();
fos = null;
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Opened stream for file:" + fileName + ", fileId:"
+ fileHandle.getFileId());
}
}
} catch (IOException e) { } catch (IOException e) {
LOG.error("Exception", e); LOG.error("Exception", e);
if (fos != null) { if (fos != null) {
@ -859,16 +882,6 @@ preOpDirAttr, new WccData(Nfs3Utils.getWccAttr(preOpDirAttr),
} }
} }
// Add open stream
OpenFileCtx openFileCtx = new OpenFileCtx(fos, postOpObjAttr, writeDumpDir
+ "/" + postOpObjAttr.getFileId(), dfsClient, iug);
fileHandle = new FileHandle(postOpObjAttr.getFileId());
writeManager.addOpenFileStream(fileHandle, openFileCtx);
if (LOG.isDebugEnabled()) {
LOG.debug("open stream for file:" + fileName + ", fileId:"
+ fileHandle.getFileId());
}
return new CREATE3Response(Nfs3Status.NFS3_OK, fileHandle, postOpObjAttr, return new CREATE3Response(Nfs3Status.NFS3_OK, fileHandle, postOpObjAttr,
dirWcc); dirWcc);
} }

View File

@ -18,8 +18,6 @@
package org.apache.hadoop.hdfs.nfs.nfs3; package org.apache.hadoop.hdfs.nfs.nfs3;
import java.io.IOException; import java.io.IOException;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -29,11 +27,12 @@
import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.nfs.nfs3.OpenFileCtx.COMMIT_STATUS; import org.apache.hadoop.hdfs.nfs.nfs3.OpenFileCtx.COMMIT_STATUS;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.nfs.NfsFileType; import org.apache.hadoop.nfs.NfsFileType;
import org.apache.hadoop.nfs.nfs3.FileHandle; import org.apache.hadoop.nfs.nfs3.FileHandle;
import org.apache.hadoop.nfs.nfs3.IdUserGroup; import org.apache.hadoop.nfs.nfs3.IdUserGroup;
import org.apache.hadoop.nfs.nfs3.Nfs3Constant; import org.apache.hadoop.nfs.nfs3.Nfs3Constant;
import org.apache.hadoop.nfs.nfs3.Nfs3Constant.WriteStableHow;
import org.apache.hadoop.nfs.nfs3.Nfs3FileAttributes; import org.apache.hadoop.nfs.nfs3.Nfs3FileAttributes;
import org.apache.hadoop.nfs.nfs3.Nfs3Status; import org.apache.hadoop.nfs.nfs3.Nfs3Status;
import org.apache.hadoop.nfs.nfs3.request.WRITE3Request; import org.apache.hadoop.nfs.nfs3.request.WRITE3Request;
@ -56,69 +55,70 @@ public class WriteManager {
private final Configuration config; private final Configuration config;
private final IdUserGroup iug; private final IdUserGroup iug;
private final ConcurrentMap<FileHandle, OpenFileCtx> openFileMap = Maps
.newConcurrentMap();
private AsyncDataService asyncDataService; private AsyncDataService asyncDataService;
private boolean asyncDataServiceStarted = false; private boolean asyncDataServiceStarted = false;
private final StreamMonitor streamMonitor; private final int maxStreams;
/** /**
* The time limit to wait for accumulate reordered sequential writes to the * The time limit to wait for accumulate reordered sequential writes to the
* same file before the write is considered done. * same file before the write is considered done.
*/ */
private long streamTimeout; private long streamTimeout;
public static final long DEFAULT_STREAM_TIMEOUT = 10 * 60 * 1000; //10 minutes private final OpenFileCtxCache fileContextCache;
public static final long MINIMIUM_STREAM_TIMEOUT = 10 * 1000; //10 seconds
static public class MultipleCachedStreamException extends IOException {
void addOpenFileStream(FileHandle h, OpenFileCtx ctx) { private static final long serialVersionUID = 1L;
openFileMap.put(h, ctx);
if (LOG.isDebugEnabled()) { public MultipleCachedStreamException(String msg) {
LOG.debug("After add the new stream " + h.getFileId() super(msg);
+ ", the stream number:" + openFileMap.size());
} }
} }
boolean addOpenFileStream(FileHandle h, OpenFileCtx ctx) {
return fileContextCache.put(h, ctx);
}
WriteManager(IdUserGroup iug, final Configuration config) { WriteManager(IdUserGroup iug, final Configuration config) {
this.iug = iug; this.iug = iug;
this.config = config; this.config = config;
streamTimeout = config.getLong(Nfs3Constant.OUTPUT_STREAM_TIMEOUT,
streamTimeout = config.getLong("dfs.nfs3.stream.timeout", Nfs3Constant.OUTPUT_STREAM_TIMEOUT_DEFAULT);
DEFAULT_STREAM_TIMEOUT);
LOG.info("Stream timeout is " + streamTimeout + "ms."); LOG.info("Stream timeout is " + streamTimeout + "ms.");
if (streamTimeout < MINIMIUM_STREAM_TIMEOUT) { if (streamTimeout < Nfs3Constant.OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT) {
LOG.info("Reset stream timeout to minimum value " LOG.info("Reset stream timeout to minimum value "
+ MINIMIUM_STREAM_TIMEOUT + "ms."); + Nfs3Constant.OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT + "ms.");
streamTimeout = MINIMIUM_STREAM_TIMEOUT; streamTimeout = Nfs3Constant.OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT;
} }
maxStreams = config.getInt(Nfs3Constant.MAX_OPEN_FILES,
this.streamMonitor = new StreamMonitor(); Nfs3Constant.MAX_OPEN_FILES_DEFAULT);
LOG.info("Maximum open streams is "+ maxStreams);
this.fileContextCache = new OpenFileCtxCache(config, streamTimeout);
} }
private void startAsyncDataSerivce() { void startAsyncDataSerivce() {
streamMonitor.start(); if (asyncDataServiceStarted) {
return;
}
fileContextCache.start();
this.asyncDataService = new AsyncDataService(); this.asyncDataService = new AsyncDataService();
asyncDataServiceStarted = true; asyncDataServiceStarted = true;
} }
private void shutdownAsyncDataService() { void shutdownAsyncDataService() {
asyncDataService.shutdown(); if (!asyncDataServiceStarted) {
return;
}
asyncDataServiceStarted = false; asyncDataServiceStarted = false;
streamMonitor.interrupt(); asyncDataService.shutdown();
fileContextCache.shutdown();
} }
void handleWrite(DFSClient dfsClient, WRITE3Request request, Channel channel, void handleWrite(DFSClient dfsClient, WRITE3Request request, Channel channel,
int xid, Nfs3FileAttributes preOpAttr) throws IOException { int xid, Nfs3FileAttributes preOpAttr) throws IOException {
// First write request starts the async data service
if (!asyncDataServiceStarted) {
startAsyncDataSerivce();
}
long offset = request.getOffset();
int count = request.getCount(); int count = request.getCount();
WriteStableHow stableHow = request.getStableHow();
byte[] data = request.getData().array(); byte[] data = request.getData().array();
if (data.length < count) { if (data.length < count) {
WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_INVAL); WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_INVAL);
@ -129,13 +129,12 @@ void handleWrite(DFSClient dfsClient, WRITE3Request request, Channel channel,
FileHandle handle = request.getHandle(); FileHandle handle = request.getHandle();
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("handleWrite fileId: " + handle.getFileId() + " offset: " LOG.debug("handleWrite " + request);
+ offset + " length:" + count + " stableHow:" + stableHow.getValue());
} }
// Check if there is a stream to write // Check if there is a stream to write
FileHandle fileHandle = request.getHandle(); FileHandle fileHandle = request.getHandle();
OpenFileCtx openFileCtx = openFileMap.get(fileHandle); OpenFileCtx openFileCtx = fileContextCache.get(fileHandle);
if (openFileCtx == null) { if (openFileCtx == null) {
LOG.info("No opened stream for fileId:" + fileHandle.getFileId()); LOG.info("No opened stream for fileId:" + fileHandle.getFileId());
@ -150,6 +149,15 @@ void handleWrite(DFSClient dfsClient, WRITE3Request request, Channel channel,
fos = dfsClient.append(fileIdPath, bufferSize, null, null); fos = dfsClient.append(fileIdPath, bufferSize, null, null);
latestAttr = Nfs3Utils.getFileAttr(dfsClient, fileIdPath, iug); latestAttr = Nfs3Utils.getFileAttr(dfsClient, fileIdPath, iug);
} catch (RemoteException e) {
IOException io = e.unwrapRemoteException();
if (io instanceof AlreadyBeingCreatedException) {
LOG.warn("Can't append file:" + fileIdPath
+ ". Possibly the file is being closed. Drop the request:"
+ request + ", wait for the client to retry...");
return;
}
throw e;
} catch (IOException e) { } catch (IOException e) {
LOG.error("Can't apapend to file:" + fileIdPath + ", error:" + e); LOG.error("Can't apapend to file:" + fileIdPath + ", error:" + e);
if (fos != null) { if (fos != null) {
@ -170,9 +178,26 @@ void handleWrite(DFSClient dfsClient, WRITE3Request request, Channel channel,
Nfs3Constant.FILE_DUMP_DIR_DEFAULT); Nfs3Constant.FILE_DUMP_DIR_DEFAULT);
openFileCtx = new OpenFileCtx(fos, latestAttr, writeDumpDir + "/" openFileCtx = new OpenFileCtx(fos, latestAttr, writeDumpDir + "/"
+ fileHandle.getFileId(), dfsClient, iug); + fileHandle.getFileId(), dfsClient, iug);
addOpenFileStream(fileHandle, openFileCtx);
if (!addOpenFileStream(fileHandle, openFileCtx)) {
LOG.info("Can't add new stream. Close it. Tell client to retry.");
try {
fos.close();
} catch (IOException e) {
LOG.error("Can't close stream for fileId:" + handle.getFileId());
}
// Notify client to retry
WccData fileWcc = new WccData(latestAttr.getWccAttr(), latestAttr);
WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_JUKEBOX,
fileWcc, 0, request.getStableHow(), Nfs3Constant.WRITE_COMMIT_VERF);
Nfs3Utils.writeChannel(channel,
response.writeHeaderAndResponse(new XDR(), xid, new VerifierNone()),
xid);
return;
}
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("opened stream for file:" + fileHandle.getFileId()); LOG.debug("Opened stream for appending file:" + fileHandle.getFileId());
} }
} }
@ -185,7 +210,7 @@ void handleWrite(DFSClient dfsClient, WRITE3Request request, Channel channel,
void handleCommit(DFSClient dfsClient, FileHandle fileHandle, void handleCommit(DFSClient dfsClient, FileHandle fileHandle,
long commitOffset, Channel channel, int xid, Nfs3FileAttributes preOpAttr) { long commitOffset, Channel channel, int xid, Nfs3FileAttributes preOpAttr) {
int status; int status;
OpenFileCtx openFileCtx = openFileMap.get(fileHandle); OpenFileCtx openFileCtx = fileContextCache.get(fileHandle);
if (openFileCtx == null) { if (openFileCtx == null) {
LOG.info("No opened stream for fileId:" + fileHandle.getFileId() LOG.info("No opened stream for fileId:" + fileHandle.getFileId()
@ -238,7 +263,7 @@ Nfs3FileAttributes getFileAttr(DFSClient client, FileHandle fileHandle,
String fileIdPath = Nfs3Utils.getFileIdPath(fileHandle); String fileIdPath = Nfs3Utils.getFileIdPath(fileHandle);
Nfs3FileAttributes attr = Nfs3Utils.getFileAttr(client, fileIdPath, iug); Nfs3FileAttributes attr = Nfs3Utils.getFileAttr(client, fileIdPath, iug);
if (attr != null) { if (attr != null) {
OpenFileCtx openFileCtx = openFileMap.get(fileHandle); OpenFileCtx openFileCtx = fileContextCache.get(fileHandle);
if (openFileCtx != null) { if (openFileCtx != null) {
attr.setSize(openFileCtx.getNextOffset()); attr.setSize(openFileCtx.getNextOffset());
attr.setUsed(openFileCtx.getNextOffset()); attr.setUsed(openFileCtx.getNextOffset());
@ -253,8 +278,8 @@ Nfs3FileAttributes getFileAttr(DFSClient client, FileHandle dirHandle,
Nfs3FileAttributes attr = Nfs3Utils.getFileAttr(client, fileIdPath, iug); Nfs3FileAttributes attr = Nfs3Utils.getFileAttr(client, fileIdPath, iug);
if ((attr != null) && (attr.getType() == NfsFileType.NFSREG.toValue())) { if ((attr != null) && (attr.getType() == NfsFileType.NFSREG.toValue())) {
OpenFileCtx openFileCtx = openFileMap OpenFileCtx openFileCtx = fileContextCache.get(new FileHandle(attr
.get(new FileHandle(attr.getFileId())); .getFileId()));
if (openFileCtx != null) { if (openFileCtx != null) {
attr.setSize(openFileCtx.getNextOffset()); attr.setSize(openFileCtx.getNextOffset());
@ -263,56 +288,9 @@ Nfs3FileAttributes getFileAttr(DFSClient client, FileHandle dirHandle,
} }
return attr; return attr;
} }
@VisibleForTesting @VisibleForTesting
ConcurrentMap<FileHandle, OpenFileCtx> getOpenFileMap() { OpenFileCtxCache getOpenFileCtxCache() {
return this.openFileMap; return this.fileContextCache;
}
/**
* StreamMonitor wakes up periodically to find and closes idle streams.
*/
class StreamMonitor extends Daemon {
private int rotation = 5 * 1000; // 5 seconds
private long lastWakeupTime = 0;
@Override
public void run() {
while (true) {
Iterator<Entry<FileHandle, OpenFileCtx>> it = openFileMap.entrySet()
.iterator();
if (LOG.isTraceEnabled()) {
LOG.trace("openFileMap size:" + openFileMap.size());
}
while (it.hasNext()) {
Entry<FileHandle, OpenFileCtx> pairs = it.next();
OpenFileCtx ctx = pairs.getValue();
if (ctx.streamCleanup((pairs.getKey()).getFileId(), streamTimeout)) {
it.remove();
if (LOG.isDebugEnabled()) {
LOG.debug("After remove stream " + pairs.getKey().getFileId()
+ ", the stream number:" + openFileMap.size());
}
}
}
// Check if it can sleep
try {
long workedTime = System.currentTimeMillis() - lastWakeupTime;
if (workedTime < rotation) {
if (LOG.isTraceEnabled()) {
LOG.trace("StreamMonitor can still have a sleep:"
+ ((rotation - workedTime) / 1000));
}
Thread.sleep(rotation - workedTime);
}
lastWakeupTime = System.currentTimeMillis();
} catch (InterruptedException e) {
LOG.info("StreamMonitor got interrupted");
return;
}
}
}
} }
} }

View File

@ -51,7 +51,7 @@ public void testStart() throws IOException {
Nfs3 nfs3 = new Nfs3(exports, config); Nfs3 nfs3 = new Nfs3(exports, config);
nfs3.start(false); nfs3.start(false);
RpcProgramMountd mountd = (RpcProgramMountd) nfs3.getMountBase() RpcProgramMountd mountd = (RpcProgramMountd) nfs3.getMountd()
.getRpcProgram(); .getRpcProgram();
mountd.nullOp(new XDR(), 1234, InetAddress.getByName("localhost")); mountd.nullOp(new XDR(), 1234, InetAddress.getByName("localhost"));

View File

@ -135,6 +135,7 @@ public WriteClient(String host, int port, XDR request, Boolean oneShot) {
@Override @Override
protected ChannelPipelineFactory setPipelineFactory() { protected ChannelPipelineFactory setPipelineFactory() {
this.pipelineFactory = new ChannelPipelineFactory() { this.pipelineFactory = new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() { public ChannelPipeline getPipeline() {
return Channels.pipeline( return Channels.pipeline(
RpcUtil.constructRpcFrameDecoder(), RpcUtil.constructRpcFrameDecoder(),

View File

@ -0,0 +1,141 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.nfs.nfs3;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.nfs.nfs3.OpenFileCtx.CommitCtx;
import org.apache.hadoop.nfs.nfs3.FileHandle;
import org.apache.hadoop.nfs.nfs3.IdUserGroup;
import org.apache.hadoop.nfs.nfs3.Nfs3Constant;
import org.apache.hadoop.nfs.nfs3.Nfs3FileAttributes;
import org.junit.Test;
import org.mockito.Mockito;
public class TestOpenFileCtxCache {
static boolean cleaned = false;
@Test
public void testEviction() throws IOException, InterruptedException {
Configuration conf = new Configuration();
// Only two entries will be in the cache
conf.setInt(Nfs3Constant.MAX_OPEN_FILES, 2);
DFSClient dfsClient = Mockito.mock(DFSClient.class);
Nfs3FileAttributes attr = new Nfs3FileAttributes();
HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
Mockito.when(fos.getPos()).thenReturn((long) 0);
OpenFileCtx context1 = new OpenFileCtx(fos, attr, "/dumpFilePath",
dfsClient, new IdUserGroup());
OpenFileCtx context2 = new OpenFileCtx(fos, attr, "/dumpFilePath",
dfsClient, new IdUserGroup());
OpenFileCtx context3 = new OpenFileCtx(fos, attr, "/dumpFilePath",
dfsClient, new IdUserGroup());
OpenFileCtx context4 = new OpenFileCtx(fos, attr, "/dumpFilePath",
dfsClient, new IdUserGroup());
OpenFileCtx context5 = new OpenFileCtx(fos, attr, "/dumpFilePath",
dfsClient, new IdUserGroup());
OpenFileCtxCache cache = new OpenFileCtxCache(conf, 10 * 60 * 100);
boolean ret = cache.put(new FileHandle(1), context1);
assertTrue(ret);
Thread.sleep(1000);
ret = cache.put(new FileHandle(2), context2);
assertTrue(ret);
ret = cache.put(new FileHandle(3), context3);
assertFalse(ret);
assertTrue(cache.size() == 2);
// Wait for the oldest stream to be evict-able, insert again
Thread.sleep(Nfs3Constant.OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT);
assertTrue(cache.size() == 2);
ret = cache.put(new FileHandle(3), context3);
assertTrue(ret);
assertTrue(cache.size() == 2);
assertTrue(cache.get(new FileHandle(1)) == null);
// Test inactive entry is evicted immediately
context3.setActiveStatusForTest(false);
ret = cache.put(new FileHandle(4), context4);
assertTrue(ret);
// Now the cache has context2 and context4
// Test eviction failure if all entries have pending work.
context2.getPendingWritesForTest().put(new OffsetRange(0, 100),
new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
context4.getPendingCommitsForTest().put(new Long(100),
new CommitCtx(0, null, 0, attr));
Thread.sleep(Nfs3Constant.OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT);
ret = cache.put(new FileHandle(5), context5);
assertFalse(ret);
}
@Test
public void testScan() throws IOException, InterruptedException {
Configuration conf = new Configuration();
// Only two entries will be in the cache
conf.setInt(Nfs3Constant.MAX_OPEN_FILES, 2);
DFSClient dfsClient = Mockito.mock(DFSClient.class);
Nfs3FileAttributes attr = new Nfs3FileAttributes();
HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
Mockito.when(fos.getPos()).thenReturn((long) 0);
OpenFileCtx context1 = new OpenFileCtx(fos, attr, "/dumpFilePath",
dfsClient, new IdUserGroup());
OpenFileCtx context2 = new OpenFileCtx(fos, attr, "/dumpFilePath",
dfsClient, new IdUserGroup());
OpenFileCtx context3 = new OpenFileCtx(fos, attr, "/dumpFilePath",
dfsClient, new IdUserGroup());
OpenFileCtx context4 = new OpenFileCtx(fos, attr, "/dumpFilePath",
dfsClient, new IdUserGroup());
OpenFileCtxCache cache = new OpenFileCtxCache(conf, 10 * 60 * 100);
// Test cleaning expired entry
boolean ret = cache.put(new FileHandle(1), context1);
assertTrue(ret);
ret = cache.put(new FileHandle(2), context2);
assertTrue(ret);
Thread.sleep(Nfs3Constant.OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT + 1);
cache.scan(Nfs3Constant.OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT);
assertTrue(cache.size() == 0);
// Test cleaning inactive entry
ret = cache.put(new FileHandle(3), context3);
assertTrue(ret);
ret = cache.put(new FileHandle(4), context4);
assertTrue(ret);
context3.setActiveStatusForTest(false);
cache.scan(Nfs3Constant.OUTPUT_STREAM_TIMEOUT_DEFAULT);
assertTrue(cache.size() == 1);
assertTrue(cache.get(new FileHandle(3)) == null);
assertTrue(cache.get(new FileHandle(4)) != null);
}
}

View File

@ -186,9 +186,8 @@ public void testCheckCommit() throws IOException {
private void waitWrite(RpcProgramNfs3 nfsd, FileHandle handle, int maxWaitTime) private void waitWrite(RpcProgramNfs3 nfsd, FileHandle handle, int maxWaitTime)
throws InterruptedException { throws InterruptedException {
int waitedTime = 0; int waitedTime = 0;
ConcurrentMap<FileHandle, OpenFileCtx> openFileMap = nfsd.getWriteManager() OpenFileCtx ctx = nfsd.getWriteManager()
.getOpenFileMap(); .getOpenFileCtxCache().get(handle);
OpenFileCtx ctx = openFileMap.get(handle);
assertTrue(ctx != null); assertTrue(ctx != null);
do { do {
Thread.sleep(3000); Thread.sleep(3000);

View File

@ -596,6 +596,8 @@ Release 2.2.1 - UNRELEASED
HDFS-5252. Stable write is not handled correctly in someplace. (brandonli) HDFS-5252. Stable write is not handled correctly in someplace. (brandonli)
HDFS-5364. Add OpenFileCtx cache. (brandonli)
Release 2.2.0 - 2013-10-13 Release 2.2.0 - 2013-10-13
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES