HDFS-4971. Move IO operations out of locking in OpenFileCtx. Contributed by Jing Zhao and Brandon Li.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1525681 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jing Zhao 2013-09-23 20:02:38 +00:00
parent 743d8ae055
commit 28e3d09230
7 changed files with 566 additions and 454 deletions

View File

@ -97,7 +97,7 @@ synchronized void shutdown() {
void writeAsync(OpenFileCtx openFileCtx) { void writeAsync(OpenFileCtx openFileCtx) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Scheduling write back task for fileId: " LOG.debug("Scheduling write back task for fileId: "
+ openFileCtx.copyLatestAttr().getFileId()); + openFileCtx.getLatestAttr().getFileId());
} }
WriteBackTask wbTask = new WriteBackTask(openFileCtx); WriteBackTask wbTask = new WriteBackTask(openFileCtx);
execute(wbTask); execute(wbTask);
@ -125,7 +125,7 @@ OpenFileCtx getOpenFileCtx() {
public String toString() { public String toString() {
// Called in AsyncDataService.execute for displaying error messages. // Called in AsyncDataService.execute for displaying error messages.
return "write back data for fileId" return "write back data for fileId"
+ openFileCtx.copyLatestAttr().getFileId() + " with nextOffset " + openFileCtx.getLatestAttr().getFileId() + " with nextOffset "
+ openFileCtx.getNextOffset(); + openFileCtx.getNextOffset();
} }

View File

@ -17,19 +17,34 @@
*/ */
package org.apache.hadoop.hdfs.nfs.nfs3; package org.apache.hadoop.hdfs.nfs.nfs3;
import java.util.Comparator;
import com.google.common.base.Preconditions;
/** /**
* OffsetRange is the range of read/write request. A single point (e.g.,[5,5]) * OffsetRange is the range of read/write request. A single point (e.g.,[5,5])
* is not a valid range. * is not a valid range.
*/ */
public class OffsetRange implements Comparable<OffsetRange> { public class OffsetRange {
public static final Comparator<OffsetRange> ReverseComparatorOnMin =
new Comparator<OffsetRange>() {
@Override
public int compare(OffsetRange o1, OffsetRange o2) {
if (o1.getMin() == o2.getMin()) {
return o1.getMax() < o2.getMax() ?
1 : (o1.getMax() > o2.getMax() ? -1 : 0);
} else {
return o1.getMin() < o2.getMin() ? 1 : -1;
}
}
};
private final long min; private final long min;
private final long max; private final long max;
OffsetRange(long min, long max) { OffsetRange(long min, long max) {
if ((min >= max) || (min < 0) || (max < 0)) { Preconditions.checkArgument(min >= 0 && max >= 0 && min < max);
throw new IllegalArgumentException("Wrong offset range: (" + min + ","
+ max + ")");
}
this.min = min; this.min = min;
this.max = max; this.max = max;
} }
@ -49,24 +64,10 @@ public int hashCode() {
@Override @Override
public boolean equals(Object o) { public boolean equals(Object o) {
assert (o instanceof OffsetRange); if (o instanceof OffsetRange) {
OffsetRange range = (OffsetRange) o; OffsetRange range = (OffsetRange) o;
return (min == range.getMin()) && (max == range.getMax()); return (min == range.getMin()) && (max == range.getMax());
} }
return false;
private static int compareTo(long left, long right) {
if (left < right) {
return -1;
} else if (left > right) {
return 1;
} else {
return 0;
}
}
@Override
public int compareTo(OffsetRange other) {
final int d = compareTo(min, other.getMin());
return d != 0 ? d : compareTo(max, other.getMax());
} }
} }

View File

@ -22,12 +22,14 @@
import java.io.FileOutputStream; import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.RandomAccessFile; import java.io.RandomAccessFile;
import java.nio.channels.ClosedChannelException;
import java.security.InvalidParameterException; import java.security.InvalidParameterException;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.SortedMap; import java.util.Map.Entry;
import java.util.TreeMap; import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -50,8 +52,11 @@
import org.apache.hadoop.nfs.nfs3.response.WccData; import org.apache.hadoop.nfs.nfs3.response.WccData;
import org.apache.hadoop.oncrpc.XDR; import org.apache.hadoop.oncrpc.XDR;
import org.apache.hadoop.oncrpc.security.VerifierNone; import org.apache.hadoop.oncrpc.security.VerifierNone;
import org.apache.hadoop.util.Daemon;
import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.Channel;
import com.google.common.base.Preconditions;
/** /**
* OpenFileCtx saves the context of one HDFS file output stream. Access to it is * OpenFileCtx saves the context of one HDFS file output stream. Access to it is
* synchronized by its member lock. * synchronized by its member lock.
@ -59,34 +64,42 @@
class OpenFileCtx { class OpenFileCtx {
public static final Log LOG = LogFactory.getLog(OpenFileCtx.class); public static final Log LOG = LogFactory.getLog(OpenFileCtx.class);
/** // Pending writes water mark for dump, 1MB
* Lock to synchronize OpenFileCtx changes. Thread should get this lock before private static long DUMP_WRITE_WATER_MARK = 1024 * 1024;
* any read/write operation to an OpenFileCtx object
*/ public final static int COMMIT_FINISHED = 0;
private final ReentrantLock ctxLock; public final static int COMMIT_WAIT = 1;
public final static int COMMIT_INACTIVE_CTX = 2;
public final static int COMMIT_INACTIVE_WITH_PENDING_WRITE = 3;
public final static int COMMIT_ERROR = 4;
// The stream status. False means the stream is closed. // The stream status. False means the stream is closed.
private boolean activeState; private volatile boolean activeState;
// The stream write-back status. True means one thread is doing write back. // The stream write-back status. True means one thread is doing write back.
private boolean asyncStatus; private volatile boolean asyncStatus;
/**
* The current offset of the file in HDFS. All the content before this offset
* has been written back to HDFS.
*/
private AtomicLong nextOffset;
private final HdfsDataOutputStream fos; private final HdfsDataOutputStream fos;
private final Nfs3FileAttributes latestAttr;
private long nextOffset;
private final SortedMap<OffsetRange, WriteCtx> pendingWrites; // TODO: make it mutable and update it after each writing back to HDFS
private final Nfs3FileAttributes latestAttr;
private final ConcurrentNavigableMap<OffsetRange, WriteCtx> pendingWrites;
// The last write, commit request or write-back event. Updating time to keep // The last write, commit request or write-back event. Updating time to keep
// output steam alive. // output steam alive.
private long lastAccessTime; private long lastAccessTime;
// Pending writes water mark for dump, 1MB private volatile boolean enabledDump;
private static int DUMP_WRITE_WATER_MARK = 1024 * 1024;
private FileOutputStream dumpOut; private FileOutputStream dumpOut;
private long nonSequentialWriteInMemory; private AtomicLong nonSequentialWriteInMemory;
private boolean enabledDump;
private RandomAccessFile raf; private RandomAccessFile raf;
private final String dumpFilePath; private final String dumpFilePath;
private Daemon dumpThread;
private void updateLastAccessTime() { private void updateLastAccessTime() {
lastAccessTime = System.currentTimeMillis(); lastAccessTime = System.currentTimeMillis();
@ -96,89 +109,50 @@ private boolean checkStreamTimeout(long streamTimeout) {
return System.currentTimeMillis() - lastAccessTime > streamTimeout; return System.currentTimeMillis() - lastAccessTime > streamTimeout;
} }
// Increase or decrease the memory occupation of non-sequential writes public long getNextOffset() {
private long updateNonSequentialWriteInMemory(long count) { return nextOffset.get();
nonSequentialWriteInMemory += count;
if (LOG.isDebugEnabled()) {
LOG.debug("Update nonSequentialWriteInMemory by " + count + " new value:"
+ nonSequentialWriteInMemory);
} }
if (nonSequentialWriteInMemory < 0) { // Increase or decrease the memory occupation of non-sequential writes
LOG.error("nonSequentialWriteInMemory is negative after update with count " private long updateNonSequentialWriteInMemory(long count) {
+ count); long newValue = nonSequentialWriteInMemory.addAndGet(count);
throw new IllegalArgumentException( if (LOG.isDebugEnabled()) {
LOG.debug("Update nonSequentialWriteInMemory by " + count + " new value:"
+ newValue);
}
Preconditions.checkState(newValue >= 0,
"nonSequentialWriteInMemory is negative after update with count " "nonSequentialWriteInMemory is negative after update with count "
+ count); + count);
} return newValue;
return nonSequentialWriteInMemory;
} }
OpenFileCtx(HdfsDataOutputStream fos, Nfs3FileAttributes latestAttr, OpenFileCtx(HdfsDataOutputStream fos, Nfs3FileAttributes latestAttr,
String dumpFilePath) { String dumpFilePath) {
this.fos = fos; this.fos = fos;
this.latestAttr = latestAttr; this.latestAttr = latestAttr;
pendingWrites = new TreeMap<OffsetRange, WriteCtx>(); // We use the ReverseComparatorOnMin as the comparator of the map. In this
// way, we first dump the data with larger offset. In the meanwhile, we
// retrieve the last element to write back to HDFS.
pendingWrites = new ConcurrentSkipListMap<OffsetRange, WriteCtx>(
OffsetRange.ReverseComparatorOnMin);
updateLastAccessTime(); updateLastAccessTime();
activeState = true; activeState = true;
asyncStatus = false; asyncStatus = false;
dumpOut = null; dumpOut = null;
raf = null; raf = null;
nonSequentialWriteInMemory = 0; nonSequentialWriteInMemory = new AtomicLong(0);
this.dumpFilePath = dumpFilePath; this.dumpFilePath = dumpFilePath;
enabledDump = dumpFilePath == null ? false: true; enabledDump = dumpFilePath == null ? false: true;
nextOffset = latestAttr.getSize(); nextOffset = new AtomicLong();
assert(nextOffset == this.fos.getPos()); nextOffset.set(latestAttr.getSize());
assert(nextOffset.get() == this.fos.getPos());
ctxLock = new ReentrantLock(true); dumpThread = null;
} }
private void lockCtx() { public Nfs3FileAttributes getLatestAttr() {
if (LOG.isTraceEnabled()) { return latestAttr;
StackTraceElement[] stacktrace = Thread.currentThread().getStackTrace();
StackTraceElement e = stacktrace[2];
String methodName = e.getMethodName();
LOG.trace("lock ctx, caller:" + methodName);
}
ctxLock.lock();
}
private void unlockCtx() {
ctxLock.unlock();
if (LOG.isTraceEnabled()) {
StackTraceElement[] stacktrace = Thread.currentThread().getStackTrace();
StackTraceElement e = stacktrace[2];
String methodName = e.getMethodName();
LOG.info("unlock ctx, caller:" + methodName);
}
}
// Make a copy of the latestAttr
public Nfs3FileAttributes copyLatestAttr() {
Nfs3FileAttributes ret;
lockCtx();
try {
ret = new Nfs3FileAttributes(latestAttr);
} finally {
unlockCtx();
}
return ret;
}
private long getNextOffsetUnprotected() {
assert(ctxLock.isLocked());
return nextOffset;
}
public long getNextOffset() {
long ret;
lockCtx();
try {
ret = getNextOffsetUnprotected();
} finally {
unlockCtx();
}
return ret;
} }
// Get flushed offset. Note that flushed data may not be persisted. // Get flushed offset. Note that flushed data may not be persisted.
@ -187,12 +161,7 @@ private long getFlushedOffset() {
} }
// Check if need to dump the new writes // Check if need to dump the new writes
private void checkDump(long count) { private void checkDump() {
assert (ctxLock.isLocked());
// Always update the in memory count
updateNonSequentialWriteInMemory(count);
if (!enabledDump) { if (!enabledDump) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Do nothing, dump is disabled."); LOG.debug("Do nothing, dump is disabled.");
@ -200,29 +169,54 @@ private void checkDump(long count) {
return; return;
} }
if (nonSequentialWriteInMemory < DUMP_WRITE_WATER_MARK) { if (nonSequentialWriteInMemory.get() < DUMP_WRITE_WATER_MARK) {
return; return;
} }
// wake up the dumper thread to dump the data
synchronized (this) {
if (nonSequentialWriteInMemory.get() >= DUMP_WRITE_WATER_MARK) {
if (LOG.isDebugEnabled()) {
LOG.debug("Asking dumper to dump...");
}
if (dumpThread == null) {
dumpThread = new Daemon(new Dumper());
dumpThread.start();
} else {
this.notifyAll();
}
}
}
}
class Dumper implements Runnable {
/** Dump data into a file */
private void dump() {
// Create dump outputstream for the first time // Create dump outputstream for the first time
if (dumpOut == null) { if (dumpOut == null) {
LOG.info("Create dump file:" + dumpFilePath); LOG.info("Create dump file:" + dumpFilePath);
File dumpFile = new File(dumpFilePath); File dumpFile = new File(dumpFilePath);
try { try {
if (dumpFile.exists()) { synchronized (this) {
LOG.fatal("The dump file should not exist:" + dumpFilePath); // check if alive again
throw new RuntimeException("The dump file should not exist:" Preconditions.checkState(dumpFile.createNewFile(),
+ dumpFilePath); "The dump file should not exist: %s", dumpFilePath);
}
dumpOut = new FileOutputStream(dumpFile); dumpOut = new FileOutputStream(dumpFile);
}
} catch (IOException e) { } catch (IOException e) {
LOG.error("Got failure when creating dump stream " + dumpFilePath LOG.error("Got failure when creating dump stream " + dumpFilePath, e);
+ " with error:" + e);
enabledDump = false; enabledDump = false;
IOUtils.cleanup(LOG, dumpOut); if (dumpOut != null) {
try {
dumpOut.close();
} catch (IOException e1) {
LOG.error("Can't close dump stream " + dumpFilePath, e);
}
}
return; return;
} }
} }
// Get raf for the first dump // Get raf for the first dump
if (raf == null) { if (raf == null) {
try { try {
@ -236,10 +230,13 @@ private void checkDump(long count) {
} }
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Start dump, current write number:" + pendingWrites.size()); LOG.debug("Start dump. Before dump, nonSequentialWriteInMemory == "
+ nonSequentialWriteInMemory.get());
} }
Iterator<OffsetRange> it = pendingWrites.keySet().iterator(); Iterator<OffsetRange> it = pendingWrites.keySet().iterator();
while (it.hasNext()) { while (activeState && it.hasNext()
&& nonSequentialWriteInMemory.get() > 0) {
OffsetRange key = it.next(); OffsetRange key = it.next();
WriteCtx writeCtx = pendingWrites.get(key); WriteCtx writeCtx = pendingWrites.get(key);
try { try {
@ -248,18 +245,35 @@ private void checkDump(long count) {
updateNonSequentialWriteInMemory(-dumpedDataSize); updateNonSequentialWriteInMemory(-dumpedDataSize);
} }
} catch (IOException e) { } catch (IOException e) {
LOG.error("Dump data failed:" + writeCtx + " with error:" + e); LOG.error("Dump data failed:" + writeCtx + " with error:" + e
+ " OpenFileCtx state:" + activeState);
// Disable dump // Disable dump
enabledDump = false; enabledDump = false;
return; return;
} }
} }
if (nonSequentialWriteInMemory != 0) {
LOG.fatal("After dump, nonSequentialWriteInMemory is not zero: " if (LOG.isDebugEnabled()) {
+ nonSequentialWriteInMemory); LOG.debug("After dump, nonSequentialWriteInMemory == "
throw new RuntimeException( + nonSequentialWriteInMemory.get());
"After dump, nonSequentialWriteInMemory is not zero: " }
+ nonSequentialWriteInMemory); }
@Override
public void run() {
while (activeState && enabledDump) {
if (nonSequentialWriteInMemory.get() >= DUMP_WRITE_WATER_MARK) {
dump();
}
synchronized (OpenFileCtx.this) {
if (nonSequentialWriteInMemory.get() < DUMP_WRITE_WATER_MARK) {
try {
OpenFileCtx.this.wait();
} catch (InterruptedException e) {
}
}
}
}
} }
} }
@ -284,17 +298,19 @@ public void receivedNewWrite(DFSClient dfsClient, WRITE3Request request,
Channel channel, int xid, AsyncDataService asyncDataService, Channel channel, int xid, AsyncDataService asyncDataService,
IdUserGroup iug) { IdUserGroup iug) {
lockCtx();
try {
if (!activeState) { if (!activeState) {
LOG.info("OpenFileCtx is inactive, fileId:" LOG.info("OpenFileCtx is inactive, fileId:"
+ request.getHandle().getFileId()); + request.getHandle().getFileId());
WccData fileWcc = new WccData(latestAttr.getWccAttr(), latestAttr); WccData fileWcc = new WccData(latestAttr.getWccAttr(), latestAttr);
WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO, WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO,
fileWcc, 0, request.getStableHow(), Nfs3Constant.WRITE_COMMIT_VERF); fileWcc, 0, request.getStableHow(), Nfs3Constant.WRITE_COMMIT_VERF);
Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse( Nfs3Utils.writeChannel(channel,
new XDR(), xid, new VerifierNone()), xid); response.writeHeaderAndResponse(new XDR(), xid, new VerifierNone()),
xid);
} else { } else {
// Update the write time first
updateLastAccessTime();
// Handle repeated write requests (same xid or not). // Handle repeated write requests (same xid or not).
// If already replied, send reply again. If not replied, drop the // If already replied, send reply again. If not replied, drop the
// repeated request. // repeated request.
@ -318,113 +334,159 @@ public void receivedNewWrite(DFSClient dfsClient, WRITE3Request request,
Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse( Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse(
new XDR(), xid, new VerifierNone()), xid); new XDR(), xid, new VerifierNone()), xid);
} }
updateLastAccessTime();
} else { } else {
// not a repeated write request
receivedNewWriteInternal(dfsClient, request, channel, xid, receivedNewWriteInternal(dfsClient, request, channel, xid,
asyncDataService, iug); asyncDataService, iug);
} }
} }
}
} finally { /**
unlockCtx(); * Creates and adds a WriteCtx into the pendingWrites map. This is a
* synchronized method to handle concurrent writes.
*
* @return A non-null {@link WriteCtx} instance if the incoming write
* request's offset >= nextOffset. Otherwise null.
*/
private synchronized WriteCtx addWritesToCache(WRITE3Request request,
Channel channel, int xid) {
long offset = request.getOffset();
int count = request.getCount();
long cachedOffset = nextOffset.get();
if (LOG.isDebugEnabled()) {
LOG.debug("requesed offset=" + offset + " and current offset="
+ cachedOffset);
}
// Fail non-append call
if (offset < cachedOffset) {
LOG.warn("(offset,count,nextOffset):" + "(" + offset + "," + count + ","
+ nextOffset + ")");
return null;
} else {
DataState dataState = offset == cachedOffset ? WriteCtx.DataState.NO_DUMP
: WriteCtx.DataState.ALLOW_DUMP;
WriteCtx writeCtx = new WriteCtx(request.getHandle(),
request.getOffset(), request.getCount(), request.getStableHow(),
request.getData().array(), channel, xid, false, dataState);
if (LOG.isDebugEnabled()) {
LOG.debug("Add new write to the list with nextOffset " + cachedOffset
+ " and requesed offset=" + offset);
}
if (writeCtx.getDataState() == WriteCtx.DataState.ALLOW_DUMP) {
// update the memory size
updateNonSequentialWriteInMemory(count);
}
// check if there is a WriteCtx with the same range in pendingWrites
WriteCtx oldWriteCtx = checkRepeatedWriteRequest(request, channel, xid);
if (oldWriteCtx == null) {
addWrite(writeCtx);
} else {
LOG.warn("Got a repeated request, same range, with xid:"
+ writeCtx.getXid());
}
return writeCtx;
} }
} }
private void receivedNewWriteInternal(DFSClient dfsClient, /** Process an overwrite write request */
WRITE3Request request, Channel channel, int xid, private void processOverWrite(DFSClient dfsClient, WRITE3Request request,
AsyncDataService asyncDataService, IdUserGroup iug) { Channel channel, int xid, IdUserGroup iug) {
WccData wccData = new WccData(latestAttr.getWccAttr(), null);
long offset = request.getOffset(); long offset = request.getOffset();
int count = request.getCount(); int count = request.getCount();
WriteStableHow stableHow = request.getStableHow(); WriteStableHow stableHow = request.getStableHow();
// Get file length, fail non-append call
WccAttr preOpAttr = latestAttr.getWccAttr();
if (LOG.isDebugEnabled()) {
LOG.debug("requesed offset=" + offset + " and current filesize="
+ preOpAttr.getSize());
}
long nextOffset = getNextOffsetUnprotected();
if (offset == nextOffset) {
LOG.info("Add to the list, update nextOffset and notify the writer,"
+ " nextOffset:" + nextOffset);
WriteCtx writeCtx = new WriteCtx(request.getHandle(),
request.getOffset(), request.getCount(), request.getStableHow(),
request.getData().array(), channel, xid, false, DataState.NO_DUMP);
addWrite(writeCtx);
// Create an async task and change openFileCtx status to indicate async
// task pending
if (!asyncStatus) {
asyncStatus = true;
asyncDataService.execute(new AsyncDataService.WriteBackTask(this));
}
// Update the write time first
updateLastAccessTime();
Nfs3FileAttributes postOpAttr = new Nfs3FileAttributes(latestAttr);
// Send response immediately for unstable write
if (request.getStableHow() == WriteStableHow.UNSTABLE) {
WccData fileWcc = new WccData(preOpAttr, postOpAttr);
WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
fileWcc, count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse(
new XDR(), xid, new VerifierNone()), xid);
writeCtx.setReplied(true);
}
} else if (offset > nextOffset) {
LOG.info("Add new write to the list but not update nextOffset:"
+ nextOffset);
WriteCtx writeCtx = new WriteCtx(request.getHandle(),
request.getOffset(), request.getCount(), request.getStableHow(),
request.getData().array(), channel, xid, false, DataState.ALLOW_DUMP);
addWrite(writeCtx);
// Check if need to dump some pending requests to file
checkDump(request.getCount());
updateLastAccessTime();
Nfs3FileAttributes postOpAttr = new Nfs3FileAttributes(latestAttr);
// In test, noticed some Linux client sends a batch (e.g., 1MB)
// of reordered writes and won't send more writes until it gets
// responses of the previous batch. So here send response immediately for
// unstable non-sequential write
if (request.getStableHow() == WriteStableHow.UNSTABLE) {
WccData fileWcc = new WccData(preOpAttr, postOpAttr);
WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
fileWcc, count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse(
new XDR(), xid, new VerifierNone()), xid);
writeCtx.setReplied(true);
}
} else {
// offset < nextOffset
LOG.warn("(offset,count,nextOffset):" + "(" + offset + "," + count + ","
+ nextOffset + ")");
WccData wccData = new WccData(preOpAttr, null);
WRITE3Response response; WRITE3Response response;
long cachedOffset = nextOffset.get();
if (offset + count > nextOffset) { if (offset + count > cachedOffset) {
LOG.warn("Haven't noticed any partial overwrite out of a sequential file" LOG.warn("Haven't noticed any partial overwrite for a sequential file"
+ "write requests, so treat it as a real random write, no support."); + " write requests. Treat it as a real random write, no support.");
response = new WRITE3Response(Nfs3Status.NFS3ERR_INVAL, wccData, 0, response = new WRITE3Response(Nfs3Status.NFS3ERR_INVAL, wccData, 0,
WriteStableHow.UNSTABLE, 0); WriteStableHow.UNSTABLE, 0);
} else { } else {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Process perfectOverWrite"); LOG.debug("Process perfectOverWrite");
} }
// TODO: let executor handle perfect overwrite
response = processPerfectOverWrite(dfsClient, offset, count, stableHow, response = processPerfectOverWrite(dfsClient, offset, count, stableHow,
request.getData().array(), request.getData().array(),
Nfs3Utils.getFileIdPath(request.getHandle()), wccData, iug); Nfs3Utils.getFileIdPath(request.getHandle()), wccData, iug);
} }
updateLastAccessTime(); updateLastAccessTime();
Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse( Nfs3Utils.writeChannel(channel,
new XDR(), xid, new VerifierNone()), xid); response.writeHeaderAndResponse(new XDR(), xid, new VerifierNone()),
xid);
}
/**
* Check if we can start the write (back to HDFS) now. If there is no hole for
* writing, and there is no other threads writing (i.e., asyncStatus is
* false), start the writing and set asyncStatus to true.
*
* @return True if the new write is sequencial and we can start writing
* (including the case that there is already a thread writing).
*/
private synchronized boolean checkAndStartWrite(
AsyncDataService asyncDataService, WriteCtx writeCtx) {
if (writeCtx.getOffset() == nextOffset.get()) {
if (!asyncStatus) {
if (LOG.isDebugEnabled()) {
LOG.debug("Trigger the write back task. Current nextOffset: "
+ nextOffset.get());
}
asyncStatus = true;
asyncDataService.execute(new AsyncDataService.WriteBackTask(this));
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("The write back thread is working.");
}
}
return true;
} else {
return false;
}
}
private void receivedNewWriteInternal(DFSClient dfsClient,
WRITE3Request request, Channel channel, int xid,
AsyncDataService asyncDataService, IdUserGroup iug) {
WriteStableHow stableHow = request.getStableHow();
WccAttr preOpAttr = latestAttr.getWccAttr();
int count = request.getCount();
WriteCtx writeCtx = addWritesToCache(request, channel, xid);
if (writeCtx == null) {
// offset < nextOffset
processOverWrite(dfsClient, request, channel, xid, iug);
} else {
// The writes is added to pendingWrites.
// Check and start writing back if necessary
boolean startWriting = checkAndStartWrite(asyncDataService, writeCtx);
if (!startWriting) {
// offset > nextOffset. check if we need to dump data
checkDump();
// In test, noticed some Linux client sends a batch (e.g., 1MB)
// of reordered writes and won't send more writes until it gets
// responses of the previous batch. So here send response immediately
// for unstable non-sequential write
if (request.getStableHow() == WriteStableHow.UNSTABLE) {
if (LOG.isDebugEnabled()) {
LOG.debug("UNSTABLE write request, send response for offset: "
+ writeCtx.getOffset());
}
WccData fileWcc = new WccData(preOpAttr, latestAttr);
WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
fileWcc, count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
Nfs3Utils
.writeChannel(channel, response.writeHeaderAndResponse(new XDR(),
xid, new VerifierNone()), xid);
writeCtx.setReplied(true);
}
}
} }
} }
@ -436,7 +498,6 @@ private void receivedNewWriteInternal(DFSClient dfsClient,
private WRITE3Response processPerfectOverWrite(DFSClient dfsClient, private WRITE3Response processPerfectOverWrite(DFSClient dfsClient,
long offset, int count, WriteStableHow stableHow, byte[] data, long offset, int count, WriteStableHow stableHow, byte[] data,
String path, WccData wccData, IdUserGroup iug) { String path, WccData wccData, IdUserGroup iug) {
assert (ctxLock.isLocked());
WRITE3Response response = null; WRITE3Response response = null;
// Read the content back // Read the content back
@ -447,21 +508,30 @@ private WRITE3Response processPerfectOverWrite(DFSClient dfsClient,
try { try {
// Sync file data and length to avoid partial read failure // Sync file data and length to avoid partial read failure
fos.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH)); fos.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
} catch (ClosedChannelException closedException) {
LOG.info("The FSDataOutputStream has been closed. " +
"Continue processing the perfect overwrite.");
} catch (IOException e) {
LOG.info("hsync failed when processing possible perfect overwrite, path="
+ path + " error:" + e);
return new WRITE3Response(Nfs3Status.NFS3ERR_IO, wccData, 0, stableHow,
Nfs3Constant.WRITE_COMMIT_VERF);
}
try {
fis = new FSDataInputStream(dfsClient.open(path)); fis = new FSDataInputStream(dfsClient.open(path));
readCount = fis.read(offset, readbuffer, 0, count); readCount = fis.read(offset, readbuffer, 0, count);
if (readCount < count) { if (readCount < count) {
LOG.error("Can't read back " + count + " bytes, partial read size:" LOG.error("Can't read back " + count + " bytes, partial read size:"
+ readCount); + readCount);
return response = new WRITE3Response(Nfs3Status.NFS3ERR_IO, wccData, 0, return new WRITE3Response(Nfs3Status.NFS3ERR_IO, wccData, 0, stableHow,
stableHow, Nfs3Constant.WRITE_COMMIT_VERF); Nfs3Constant.WRITE_COMMIT_VERF);
} }
} catch (IOException e) { } catch (IOException e) {
LOG.info("Read failed when processing possible perfect overwrite, path=" LOG.info("Read failed when processing possible perfect overwrite, path="
+ path + " error:" + e); + path + " error:" + e);
return response = new WRITE3Response(Nfs3Status.NFS3ERR_IO, wccData, 0, return new WRITE3Response(Nfs3Status.NFS3ERR_IO, wccData, 0, stableHow,
stableHow, Nfs3Constant.WRITE_COMMIT_VERF); Nfs3Constant.WRITE_COMMIT_VERF);
} finally { } finally {
IOUtils.cleanup(LOG, fis); IOUtils.cleanup(LOG, fis);
} }
@ -493,39 +563,25 @@ private WRITE3Response processPerfectOverWrite(DFSClient dfsClient,
return response; return response;
} }
public final static int COMMIT_FINISHED = 0;
public final static int COMMIT_WAIT = 1;
public final static int COMMIT_INACTIVE_CTX = 2;
public final static int COMMIT_ERROR = 3;
/** /**
* return one commit status: COMMIT_FINISHED, COMMIT_WAIT, * return one commit status: COMMIT_FINISHED, COMMIT_WAIT,
* COMMIT_INACTIVE_CTX, COMMIT_ERROR * COMMIT_INACTIVE_CTX, COMMIT_ERROR
*/ */
public int checkCommit(long commitOffset) { public int checkCommit(long commitOffset) {
int ret = COMMIT_WAIT; return activeState ? checkCommitInternal(commitOffset)
: COMMIT_INACTIVE_CTX;
lockCtx();
try {
if (!activeState) {
ret = COMMIT_INACTIVE_CTX;
} else {
ret = checkCommitInternal(commitOffset);
}
} finally {
unlockCtx();
}
return ret;
} }
private int checkCommitInternal(long commitOffset) { private int checkCommitInternal(long commitOffset) {
if (commitOffset == 0) { if (commitOffset == 0) {
// Commit whole file // Commit whole file
commitOffset = getNextOffsetUnprotected(); commitOffset = nextOffset.get();
} }
long flushed = getFlushedOffset(); long flushed = getFlushedOffset();
LOG.info("getFlushedOffset=" + flushed + " commitOffset=" + commitOffset); if (LOG.isDebugEnabled()) {
LOG.debug("getFlushedOffset=" + flushed + " commitOffset=" + commitOffset);
}
if (flushed < commitOffset) { if (flushed < commitOffset) {
// Keep stream active // Keep stream active
updateLastAccessTime(); updateLastAccessTime();
@ -538,6 +594,13 @@ private int checkCommitInternal(long commitOffset) {
fos.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH)); fos.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
// Nothing to do for metadata since attr related change is pass-through // Nothing to do for metadata since attr related change is pass-through
ret = COMMIT_FINISHED; ret = COMMIT_FINISHED;
} catch (ClosedChannelException cce) {
ret = COMMIT_INACTIVE_CTX;
if (pendingWrites.isEmpty()) {
ret = COMMIT_INACTIVE_CTX;
} else {
ret = COMMIT_INACTIVE_WITH_PENDING_WRITE;
}
} catch (IOException e) { } catch (IOException e) {
LOG.error("Got stream error during data sync:" + e); LOG.error("Got stream error during data sync:" + e);
// Do nothing. Stream will be closed eventually by StreamMonitor. // Do nothing. Stream will be closed eventually by StreamMonitor.
@ -550,18 +613,16 @@ private int checkCommitInternal(long commitOffset) {
} }
private void addWrite(WriteCtx writeCtx) { private void addWrite(WriteCtx writeCtx) {
assert (ctxLock.isLocked());
long offset = writeCtx.getOffset(); long offset = writeCtx.getOffset();
int count = writeCtx.getCount(); int count = writeCtx.getCount();
pendingWrites.put(new OffsetRange(offset, offset + count), writeCtx); pendingWrites.put(new OffsetRange(offset, offset + count), writeCtx);
} }
/** /**
* Check stream status to decide if it should be closed * Check stream status to decide if it should be closed
* @return true, remove stream; false, keep stream * @return true, remove stream; false, keep stream
*/ */
public boolean streamCleanup(long fileId, long streamTimeout) { public synchronized boolean streamCleanup(long fileId, long streamTimeout) {
if (streamTimeout < WriteManager.MINIMIUM_STREAM_TIMEOUT) { if (streamTimeout < WriteManager.MINIMIUM_STREAM_TIMEOUT) {
throw new InvalidParameterException("StreamTimeout" + streamTimeout throw new InvalidParameterException("StreamTimeout" + streamTimeout
+ "ms is less than MINIMIUM_STREAM_TIMEOUT " + "ms is less than MINIMIUM_STREAM_TIMEOUT "
@ -569,107 +630,97 @@ public boolean streamCleanup(long fileId, long streamTimeout) {
} }
boolean flag = false; boolean flag = false;
if (!ctxLock.tryLock()) {
if (LOG.isTraceEnabled()) {
LOG.trace("Another thread is working on it" + ctxLock.toString());
}
return flag;
}
try {
// Check the stream timeout // Check the stream timeout
if (checkStreamTimeout(streamTimeout)) { if (checkStreamTimeout(streamTimeout)) {
LOG.info("closing stream for fileId:" + fileId); if (LOG.isDebugEnabled()) {
LOG.debug("closing stream for fileId:" + fileId);
}
cleanup(); cleanup();
flag = true; flag = true;
} }
} finally {
unlockCtx();
}
return flag; return flag;
} }
// Invoked by AsynDataService to do the write back /**
public void executeWriteBack() { * Get (and remove) the next WriteCtx from {@link #pendingWrites} if possible.
long nextOffset; *
OffsetRange key; * @return Null if {@link #pendingWrites} is null, or the next WriteCtx's
WriteCtx writeCtx; * offset is larger than nextOffSet.
*/
try { private synchronized WriteCtx offerNextToWrite() {
// Don't lock OpenFileCtx for all writes to reduce the timeout of other
// client request to the same file
while (true) {
lockCtx();
if (!asyncStatus) {
// This should never happen. There should be only one thread working
// on one OpenFileCtx anytime.
LOG.fatal("The openFileCtx has false async status");
throw new RuntimeException("The openFileCtx has false async status");
}
// Any single write failure can change activeState to false, so do the
// check each loop.
if (pendingWrites.isEmpty()) { if (pendingWrites.isEmpty()) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("The asyn write task has no pendding writes, fileId: " LOG.debug("The asyn write task has no pending writes, fileId: "
+ latestAttr.getFileId()); + latestAttr.getFileId());
} }
break; this.asyncStatus = false;
} } else {
if (!activeState) { Entry<OffsetRange, WriteCtx> lastEntry = pendingWrites.lastEntry();
if (LOG.isDebugEnabled()) { OffsetRange range = lastEntry.getKey();
LOG.debug("The openFileCtx is not active anymore, fileId: " WriteCtx toWrite = lastEntry.getValue();
+ latestAttr.getFileId());
}
break;
}
// Get the next sequential write
nextOffset = getNextOffsetUnprotected();
key = pendingWrites.firstKey();
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("key.getMin()=" + key.getMin() + " nextOffset=" LOG.trace("range.getMin()=" + range.getMin() + " nextOffset="
+ nextOffset); + nextOffset);
} }
if (key.getMin() > nextOffset) { long offset = nextOffset.get();
if (range.getMin() > offset) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.info("The next sequencial write has not arrived yet"); LOG.debug("The next sequencial write has not arrived yet");
} }
break; this.asyncStatus = false;
} else if (range.getMin() < offset && range.getMax() > offset) {
} else if (key.getMin() < nextOffset && key.getMax() > nextOffset) { // shouldn't happen since we do sync for overlapped concurrent writers
// Can't handle overlapping write. Didn't see it in tests yet. LOG.warn("Got a overlapping write (" + range.getMin() + ","
LOG.fatal("Got a overlapping write (" + key.getMin() + "," + range.getMax() + "), nextOffset=" + offset
+ key.getMax() + "), nextOffset=" + nextOffset); + ". Silently drop it now");
throw new RuntimeException("Got a overlapping write (" + key.getMin() pendingWrites.remove(range);
+ "," + key.getMax() + "), nextOffset=" + nextOffset);
} else { } else {
if (LOG.isTraceEnabled()) { if (LOG.isDebugEnabled()) {
LOG.trace("Remove write(" + key.getMin() + "-" + key.getMax() LOG.debug("Remove write(" + range.getMin() + "-" + range.getMax()
+ ") from the list"); + ") from the list");
} }
writeCtx = pendingWrites.remove(key); // after writing, remove the WriteCtx from cache
pendingWrites.remove(range);
// update nextOffset
nextOffset.addAndGet(toWrite.getCount());
if (LOG.isDebugEnabled()) {
LOG.debug("Change nextOffset to " + nextOffset.get());
}
return toWrite;
}
}
return null;
}
/** Invoked by AsynDataService to write back to HDFS */
void executeWriteBack() {
Preconditions.checkState(asyncStatus,
"The openFileCtx has false async status");
try {
while (activeState) {
WriteCtx toWrite = offerNextToWrite();
if (toWrite != null) {
// Do the write // Do the write
doSingleWrite(writeCtx); doSingleWrite(toWrite);
updateLastAccessTime(); updateLastAccessTime();
} else {
break;
}
} }
unlockCtx(); if (!activeState && LOG.isDebugEnabled()) {
LOG.debug("The openFileCtx is not active anymore, fileId: "
+ +latestAttr.getFileId());
} }
} finally { } finally {
// Always reset the async status so another async task can be created // make sure we reset asyncStatus to false
// for this file
asyncStatus = false; asyncStatus = false;
if (ctxLock.isHeldByCurrentThread()) {
unlockCtx();
}
} }
} }
private void doSingleWrite(final WriteCtx writeCtx) { private void doSingleWrite(final WriteCtx writeCtx) {
assert(ctxLock.isLocked());
Channel channel = writeCtx.getChannel(); Channel channel = writeCtx.getChannel();
int xid = writeCtx.getXid(); int xid = writeCtx.getXid();
@ -679,20 +730,25 @@ private void doSingleWrite(final WriteCtx writeCtx) {
byte[] data = null; byte[] data = null;
try { try {
data = writeCtx.getData(); data = writeCtx.getData();
} catch (IOException e1) { } catch (Exception e1) {
LOG.error("Failed to get request data offset:" + offset + " count:" LOG.error("Failed to get request data offset:" + offset + " count:"
+ count + " error:" + e1); + count + " error:" + e1);
// Cleanup everything // Cleanup everything
cleanup(); cleanup();
return; return;
} }
assert (data.length == count);
Preconditions.checkState(data.length == count);
FileHandle handle = writeCtx.getHandle(); FileHandle handle = writeCtx.getHandle();
LOG.info("do write, fileId: " + handle.getFileId() + " offset: " + offset if (LOG.isDebugEnabled()) {
+ " length:" + count + " stableHow:" + stableHow.getValue()); LOG.debug("do write, fileId: " + handle.getFileId() + " offset: "
+ offset + " length:" + count + " stableHow:" + stableHow.getValue());
}
try { try {
// The write is not protected by lock. asyncState is used to make sure
// there is one thread doing write back at any time
fos.write(data, 0, count); fos.write(data, 0, count);
long flushedOffset = getFlushedOffset(); long flushedOffset = getFlushedOffset();
@ -701,12 +757,21 @@ private void doSingleWrite(final WriteCtx writeCtx) {
+ flushedOffset + " and nextOffset should be" + flushedOffset + " and nextOffset should be"
+ (offset + count)); + (offset + count));
} }
nextOffset = flushedOffset;
if (LOG.isDebugEnabled()) {
LOG.debug("After writing " + handle.getFileId() + " at offset "
+ offset + ", update the memory count.");
}
// Reduce memory occupation size if request was allowed dumped // Reduce memory occupation size if request was allowed dumped
if (writeCtx.getDataState() == DataState.ALLOW_DUMP) { if (writeCtx.getDataState() == WriteCtx.DataState.ALLOW_DUMP) {
synchronized (writeCtx) {
if (writeCtx.getDataState() == WriteCtx.DataState.ALLOW_DUMP) {
writeCtx.setDataState(WriteCtx.DataState.NO_DUMP);
updateNonSequentialWriteInMemory(-count); updateNonSequentialWriteInMemory(-count);
} }
}
}
if (!writeCtx.getReplied()) { if (!writeCtx.getReplied()) {
WccAttr preOpAttr = latestAttr.getWccAttr(); WccAttr preOpAttr = latestAttr.getWccAttr();
@ -716,7 +781,6 @@ private void doSingleWrite(final WriteCtx writeCtx) {
Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse( Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse(
new XDR(), xid, new VerifierNone()), xid); new XDR(), xid, new VerifierNone()), xid);
} }
} catch (IOException e) { } catch (IOException e) {
LOG.error("Error writing to fileId " + handle.getFileId() + " at offset " LOG.error("Error writing to fileId " + handle.getFileId() + " at offset "
+ offset + " and length " + data.length, e); + offset + " and length " + data.length, e);
@ -733,10 +797,22 @@ private void doSingleWrite(final WriteCtx writeCtx) {
} }
} }
private void cleanup() { private synchronized void cleanup() {
assert(ctxLock.isLocked()); if (!activeState) {
LOG.info("Current OpenFileCtx is already inactive, no need to cleanup.");
return;
}
activeState = false; activeState = false;
// stop the dump thread
if (dumpThread != null) {
dumpThread.interrupt();
try {
dumpThread.join(3000);
} catch (InterruptedException e) {
}
}
// Close stream // Close stream
try { try {
if (fos != null) { if (fos != null) {
@ -753,7 +829,7 @@ private void cleanup() {
while (!pendingWrites.isEmpty()) { while (!pendingWrites.isEmpty()) {
OffsetRange key = pendingWrites.firstKey(); OffsetRange key = pendingWrites.firstKey();
LOG.info("Fail pending write: (" + key.getMin() + "," + key.getMax() LOG.info("Fail pending write: (" + key.getMin() + "," + key.getMax()
+ "), nextOffset=" + getNextOffsetUnprotected()); + "), nextOffset=" + nextOffset.get());
WriteCtx writeCtx = pendingWrites.remove(key); WriteCtx writeCtx = pendingWrites.remove(key);
if (!writeCtx.getReplied()) { if (!writeCtx.getReplied()) {
@ -773,6 +849,10 @@ private void cleanup() {
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace(); e.printStackTrace();
} }
File dumpFile = new File(dumpFilePath);
if (dumpFile.exists() && !dumpFile.delete()) {
LOG.error("Failed to delete dumpfile: " + dumpFile);
}
} }
if (raf != null) { if (raf != null) {
try { try {
@ -781,9 +861,5 @@ private void cleanup() {
e.printStackTrace(); e.printStackTrace();
} }
} }
File dumpFile = new File(dumpFilePath);
if (dumpFile.delete()) {
LOG.error("Failed to delete dumpfile: "+ dumpFile);
}
} }
} }

View File

@ -27,6 +27,8 @@
import org.apache.hadoop.nfs.nfs3.Nfs3Constant.WriteStableHow; import org.apache.hadoop.nfs.nfs3.Nfs3Constant.WriteStableHow;
import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.Channel;
import com.google.common.base.Preconditions;
/** /**
* WriteCtx saves the context of one write request, such as request, channel, * WriteCtx saves the context of one write request, such as request, channel,
* xid and reply status. * xid and reply status.
@ -49,13 +51,21 @@ public static enum DataState {
private final long offset; private final long offset;
private final int count; private final int count;
private final WriteStableHow stableHow; private final WriteStableHow stableHow;
private byte[] data; private volatile byte[] data;
private final Channel channel; private final Channel channel;
private final int xid; private final int xid;
private boolean replied; private boolean replied;
private DataState dataState; /**
* Data belonging to the same {@link OpenFileCtx} may be dumped to a file.
* After being dumped to the file, the corresponding {@link WriteCtx} records
* the dump file and the offset.
*/
private RandomAccessFile raf;
private long dumpFileOffset;
private volatile DataState dataState;
public DataState getDataState() { public DataState getDataState() {
return dataState; return dataState;
@ -65,11 +75,12 @@ public void setDataState(DataState dataState) {
this.dataState = dataState; this.dataState = dataState;
} }
private RandomAccessFile raf; /**
private long dumpFileOffset; * Writing the data into a local file. After the writing, if
* {@link #dataState} is still ALLOW_DUMP, set {@link #data} to null and set
// Return the dumped data size * {@link #dataState} to DUMPED.
public long dumpData(FileOutputStream dumpOut, RandomAccessFile raf) */
long dumpData(FileOutputStream dumpOut, RandomAccessFile raf)
throws IOException { throws IOException {
if (dataState != DataState.ALLOW_DUMP) { if (dataState != DataState.ALLOW_DUMP) {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
@ -84,37 +95,54 @@ public long dumpData(FileOutputStream dumpOut, RandomAccessFile raf)
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("After dump, new dumpFileOffset:" + dumpFileOffset); LOG.debug("After dump, new dumpFileOffset:" + dumpFileOffset);
} }
// it is possible that while we dump the data, the data is also being
// written back to HDFS. After dump, if the writing back has not finished
// yet, we change its flag to DUMPED and set the data to null. Otherwise
// this WriteCtx instance should have been removed from the buffer.
if (dataState == DataState.ALLOW_DUMP) {
synchronized (this) {
if (dataState == DataState.ALLOW_DUMP) {
data = null; data = null;
dataState = DataState.DUMPED; dataState = DataState.DUMPED;
return count; return count;
} }
}
}
return 0;
}
public FileHandle getHandle() { FileHandle getHandle() {
return handle; return handle;
} }
public long getOffset() { long getOffset() {
return offset; return offset;
} }
public int getCount() { int getCount() {
return count; return count;
} }
public WriteStableHow getStableHow() { WriteStableHow getStableHow() {
return stableHow; return stableHow;
} }
public byte[] getData() throws IOException { byte[] getData() throws IOException {
if (dataState != DataState.DUMPED) { if (dataState != DataState.DUMPED) {
if (data == null) { synchronized (this) {
throw new IOException("Data is not dumpted but has null:" + this); if (dataState != DataState.DUMPED) {
Preconditions.checkState(data != null);
return data;
} }
} else {
// read back
if (data != null) {
throw new IOException("Data is dumpted but not null");
} }
}
// read back from dumped file
this.loadData();
return data;
}
private void loadData() throws IOException {
Preconditions.checkState(data == null);
data = new byte[count]; data = new byte[count];
raf.seek(dumpFileOffset); raf.seek(dumpFileOffset);
int size = raf.read(data, 0, count); int size = raf.read(data, 0, count);
@ -123,8 +151,6 @@ public byte[] getData() throws IOException {
+ size + "bytes"); + size + "bytes");
} }
} }
return data;
}
Channel getChannel() { Channel getChannel() {
return channel; return channel;

View File

@ -67,8 +67,8 @@ public class WriteManager {
*/ */
private long streamTimeout; private long streamTimeout;
public static final long DEFAULT_STREAM_TIMEOUT = 10 * 1000; // 10 second public static final long DEFAULT_STREAM_TIMEOUT = 10 * 60 * 1000; //10 minutes
public static final long MINIMIUM_STREAM_TIMEOUT = 1 * 1000; // 1 second public static final long MINIMIUM_STREAM_TIMEOUT = 10 * 1000; //10 seconds
void addOpenFileStream(FileHandle h, OpenFileCtx ctx) { void addOpenFileStream(FileHandle h, OpenFileCtx ctx) {
openFileMap.put(h, ctx); openFileMap.put(h, ctx);
@ -215,6 +215,10 @@ boolean handleCommit(FileHandle fileHandle, long commitOffset) {
LOG.info("Inactive stream, fileId=" + fileHandle.getFileId() LOG.info("Inactive stream, fileId=" + fileHandle.getFileId()
+ " commitOffset=" + commitOffset); + " commitOffset=" + commitOffset);
return true; return true;
} else if (ret == OpenFileCtx.COMMIT_INACTIVE_WITH_PENDING_WRITE) {
LOG.info("Inactive stream with pending writes, fileId="
+ fileHandle.getFileId() + " commitOffset=" + commitOffset);
return false;
} }
assert (ret == OpenFileCtx.COMMIT_WAIT || ret == OpenFileCtx.COMMIT_ERROR); assert (ret == OpenFileCtx.COMMIT_WAIT || ret == OpenFileCtx.COMMIT_ERROR);
if (ret == OpenFileCtx.COMMIT_ERROR) { if (ret == OpenFileCtx.COMMIT_ERROR) {

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.nfs.nfs3; package org.apache.hadoop.hdfs.nfs.nfs3;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertEquals;
import java.io.IOException; import java.io.IOException;
@ -51,8 +52,9 @@ public void testCompare() throws IOException {
OffsetRange r3 = new OffsetRange(1, 3); OffsetRange r3 = new OffsetRange(1, 3);
OffsetRange r4 = new OffsetRange(3, 4); OffsetRange r4 = new OffsetRange(3, 4);
assertTrue(r2.compareTo(r3) == 0); assertEquals(0, OffsetRange.ReverseComparatorOnMin.compare(r2, r3));
assertTrue(r2.compareTo(r1) == 1); assertEquals(0, OffsetRange.ReverseComparatorOnMin.compare(r2, r2));
assertTrue(r2.compareTo(r4) == -1); assertTrue(OffsetRange.ReverseComparatorOnMin.compare(r2, r1) < 0);
assertTrue(OffsetRange.ReverseComparatorOnMin.compare(r2, r4) > 0);
} }
} }

View File

@ -414,6 +414,9 @@ Release 2.1.1-beta - 2013-09-23
HDFS-5212. Refactor RpcMessage and NFS3Response to support different HDFS-5212. Refactor RpcMessage and NFS3Response to support different
types of authentication information. (jing9) types of authentication information. (jing9)
HDFS-4971. Move IO operations out of locking in OpenFileCtx. (brandonli and
jing9)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES