HDFS-5259. Support client which combines appended data with old data before sends it to NFS server. Contributed by Brandon Li
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1529730 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
f0799c5536
commit
caa4abd30c
@ -28,8 +28,8 @@
|
|||||||
* WRITE3 Request
|
* WRITE3 Request
|
||||||
*/
|
*/
|
||||||
public class WRITE3Request extends RequestWithHandle {
|
public class WRITE3Request extends RequestWithHandle {
|
||||||
private final long offset;
|
private long offset;
|
||||||
private final int count;
|
private int count;
|
||||||
private final WriteStableHow stableHow;
|
private final WriteStableHow stableHow;
|
||||||
private final ByteBuffer data;
|
private final ByteBuffer data;
|
||||||
|
|
||||||
@ -54,10 +54,18 @@ public long getOffset() {
|
|||||||
return this.offset;
|
return this.offset;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setOffset(long offset) {
|
||||||
|
this.offset = offset;
|
||||||
|
}
|
||||||
|
|
||||||
public int getCount() {
|
public int getCount() {
|
||||||
return this.count;
|
return this.count;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setCount(int count) {
|
||||||
|
this.count = count;
|
||||||
|
}
|
||||||
|
|
||||||
public WriteStableHow getStableHow() {
|
public WriteStableHow getStableHow() {
|
||||||
return this.stableHow;
|
return this.stableHow;
|
||||||
}
|
}
|
||||||
|
@ -22,6 +22,7 @@
|
|||||||
import java.io.FileOutputStream;
|
import java.io.FileOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.RandomAccessFile;
|
import java.io.RandomAccessFile;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
import java.nio.channels.ClosedChannelException;
|
import java.nio.channels.ClosedChannelException;
|
||||||
import java.security.InvalidParameterException;
|
import java.security.InvalidParameterException;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
@ -55,6 +56,7 @@
|
|||||||
import org.apache.hadoop.util.Daemon;
|
import org.apache.hadoop.util.Daemon;
|
||||||
import org.jboss.netty.channel.Channel;
|
import org.jboss.netty.channel.Channel;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -360,6 +362,30 @@ public void receivedNewWrite(DFSClient dfsClient, WRITE3Request request,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public static void alterWriteRequest(WRITE3Request request, long cachedOffset) {
|
||||||
|
long offset = request.getOffset();
|
||||||
|
int count = request.getCount();
|
||||||
|
long smallerCount = offset + count - cachedOffset;
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug(String.format("Got overwrite with appended data (%d-%d),"
|
||||||
|
+ " current offset %d," + " drop the overlapped section (%d-%d)"
|
||||||
|
+ " and append new data (%d-%d).", offset, (offset + count - 1),
|
||||||
|
cachedOffset, offset, (cachedOffset - 1), cachedOffset, (offset
|
||||||
|
+ count - 1)));
|
||||||
|
}
|
||||||
|
|
||||||
|
ByteBuffer data = request.getData();
|
||||||
|
Preconditions.checkState(data.position() == 0,
|
||||||
|
"The write request data has non-zero position");
|
||||||
|
data.position((int) (cachedOffset - offset));
|
||||||
|
Preconditions.checkState(data.limit() - data.position() == smallerCount,
|
||||||
|
"The write request buffer has wrong limit/position regarding count");
|
||||||
|
|
||||||
|
request.setOffset(cachedOffset);
|
||||||
|
request.setCount((int) smallerCount);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates and adds a WriteCtx into the pendingWrites map. This is a
|
* Creates and adds a WriteCtx into the pendingWrites map. This is a
|
||||||
* synchronized method to handle concurrent writes.
|
* synchronized method to handle concurrent writes.
|
||||||
@ -372,12 +398,40 @@ private synchronized WriteCtx addWritesToCache(WRITE3Request request,
|
|||||||
long offset = request.getOffset();
|
long offset = request.getOffset();
|
||||||
int count = request.getCount();
|
int count = request.getCount();
|
||||||
long cachedOffset = nextOffset.get();
|
long cachedOffset = nextOffset.get();
|
||||||
|
int originalCount = WriteCtx.INVALID_ORIGINAL_COUNT;
|
||||||
|
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("requesed offset=" + offset + " and current offset="
|
LOG.debug("requesed offset=" + offset + " and current offset="
|
||||||
+ cachedOffset);
|
+ cachedOffset);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Handle a special case first
|
||||||
|
if ((offset < cachedOffset) && (offset + count > cachedOffset)) {
|
||||||
|
// One Linux client behavior: after a file is closed and reopened to
|
||||||
|
// write, the client sometimes combines previous written data(could still
|
||||||
|
// be in kernel buffer) with newly appended data in one write. This is
|
||||||
|
// usually the first write after file reopened. In this
|
||||||
|
// case, we log the event and drop the overlapped section.
|
||||||
|
LOG.warn(String.format("Got overwrite with appended data (%d-%d),"
|
||||||
|
+ " current offset %d," + " drop the overlapped section (%d-%d)"
|
||||||
|
+ " and append new data (%d-%d).", offset, (offset + count - 1),
|
||||||
|
cachedOffset, offset, (cachedOffset - 1), cachedOffset, (offset
|
||||||
|
+ count - 1)));
|
||||||
|
|
||||||
|
if (!pendingWrites.isEmpty()) {
|
||||||
|
LOG.warn("There are other pending writes, fail this jumbo write");
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG.warn("Modify this write to write only the appended data");
|
||||||
|
alterWriteRequest(request, cachedOffset);
|
||||||
|
|
||||||
|
// Update local variable
|
||||||
|
originalCount = count;
|
||||||
|
offset = request.getOffset();
|
||||||
|
count = request.getCount();
|
||||||
|
}
|
||||||
|
|
||||||
// Fail non-append call
|
// Fail non-append call
|
||||||
if (offset < cachedOffset) {
|
if (offset < cachedOffset) {
|
||||||
LOG.warn("(offset,count,nextOffset):" + "(" + offset + "," + count + ","
|
LOG.warn("(offset,count,nextOffset):" + "(" + offset + "," + count + ","
|
||||||
@ -387,8 +441,9 @@ private synchronized WriteCtx addWritesToCache(WRITE3Request request,
|
|||||||
DataState dataState = offset == cachedOffset ? WriteCtx.DataState.NO_DUMP
|
DataState dataState = offset == cachedOffset ? WriteCtx.DataState.NO_DUMP
|
||||||
: WriteCtx.DataState.ALLOW_DUMP;
|
: WriteCtx.DataState.ALLOW_DUMP;
|
||||||
WriteCtx writeCtx = new WriteCtx(request.getHandle(),
|
WriteCtx writeCtx = new WriteCtx(request.getHandle(),
|
||||||
request.getOffset(), request.getCount(), request.getStableHow(),
|
request.getOffset(), request.getCount(), originalCount,
|
||||||
request.getData().array(), channel, xid, false, dataState);
|
request.getStableHow(), request.getData(), channel, xid, false,
|
||||||
|
dataState);
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Add new write to the list with nextOffset " + cachedOffset
|
LOG.debug("Add new write to the list with nextOffset " + cachedOffset
|
||||||
+ " and requesed offset=" + offset);
|
+ " and requesed offset=" + offset);
|
||||||
@ -419,8 +474,7 @@ private void processOverWrite(DFSClient dfsClient, WRITE3Request request,
|
|||||||
WRITE3Response response;
|
WRITE3Response response;
|
||||||
long cachedOffset = nextOffset.get();
|
long cachedOffset = nextOffset.get();
|
||||||
if (offset + count > cachedOffset) {
|
if (offset + count > cachedOffset) {
|
||||||
LOG.warn("Haven't noticed any partial overwrite for a sequential file"
|
LOG.warn("Treat this jumbo write as a real random write, no support.");
|
||||||
+ " write requests. Treat it as a real random write, no support.");
|
|
||||||
response = new WRITE3Response(Nfs3Status.NFS3ERR_INVAL, wccData, 0,
|
response = new WRITE3Response(Nfs3Status.NFS3ERR_INVAL, wccData, 0,
|
||||||
WriteStableHow.UNSTABLE, Nfs3Constant.WRITE_COMMIT_VERF);
|
WriteStableHow.UNSTABLE, Nfs3Constant.WRITE_COMMIT_VERF);
|
||||||
} else {
|
} else {
|
||||||
@ -633,6 +687,7 @@ private int checkCommitInternal(long commitOffset) {
|
|||||||
private void addWrite(WriteCtx writeCtx) {
|
private void addWrite(WriteCtx writeCtx) {
|
||||||
long offset = writeCtx.getOffset();
|
long offset = writeCtx.getOffset();
|
||||||
int count = writeCtx.getCount();
|
int count = writeCtx.getCount();
|
||||||
|
// For the offset range (min, max), min is inclusive, and max is exclusive
|
||||||
pendingWrites.put(new OffsetRange(offset, offset + count), writeCtx);
|
pendingWrites.put(new OffsetRange(offset, offset + count), writeCtx);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -745,18 +800,6 @@ private void doSingleWrite(final WriteCtx writeCtx) {
|
|||||||
long offset = writeCtx.getOffset();
|
long offset = writeCtx.getOffset();
|
||||||
int count = writeCtx.getCount();
|
int count = writeCtx.getCount();
|
||||||
WriteStableHow stableHow = writeCtx.getStableHow();
|
WriteStableHow stableHow = writeCtx.getStableHow();
|
||||||
byte[] data = null;
|
|
||||||
try {
|
|
||||||
data = writeCtx.getData();
|
|
||||||
} catch (Exception e1) {
|
|
||||||
LOG.error("Failed to get request data offset:" + offset + " count:"
|
|
||||||
+ count + " error:" + e1);
|
|
||||||
// Cleanup everything
|
|
||||||
cleanup();
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
Preconditions.checkState(data.length == count);
|
|
||||||
|
|
||||||
FileHandle handle = writeCtx.getHandle();
|
FileHandle handle = writeCtx.getHandle();
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
@ -767,7 +810,7 @@ private void doSingleWrite(final WriteCtx writeCtx) {
|
|||||||
try {
|
try {
|
||||||
// The write is not protected by lock. asyncState is used to make sure
|
// The write is not protected by lock. asyncState is used to make sure
|
||||||
// there is one thread doing write back at any time
|
// there is one thread doing write back at any time
|
||||||
fos.write(data, 0, count);
|
writeCtx.writeData(fos);
|
||||||
|
|
||||||
long flushedOffset = getFlushedOffset();
|
long flushedOffset = getFlushedOffset();
|
||||||
if (flushedOffset != (offset + count)) {
|
if (flushedOffset != (offset + count)) {
|
||||||
@ -776,10 +819,6 @@ private void doSingleWrite(final WriteCtx writeCtx) {
|
|||||||
+ (offset + count));
|
+ (offset + count));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("After writing " + handle.getFileId() + " at offset "
|
|
||||||
+ offset + ", update the memory count.");
|
|
||||||
}
|
|
||||||
|
|
||||||
// Reduce memory occupation size if request was allowed dumped
|
// Reduce memory occupation size if request was allowed dumped
|
||||||
if (writeCtx.getDataState() == WriteCtx.DataState.ALLOW_DUMP) {
|
if (writeCtx.getDataState() == WriteCtx.DataState.ALLOW_DUMP) {
|
||||||
@ -787,6 +826,11 @@ private void doSingleWrite(final WriteCtx writeCtx) {
|
|||||||
if (writeCtx.getDataState() == WriteCtx.DataState.ALLOW_DUMP) {
|
if (writeCtx.getDataState() == WriteCtx.DataState.ALLOW_DUMP) {
|
||||||
writeCtx.setDataState(WriteCtx.DataState.NO_DUMP);
|
writeCtx.setDataState(WriteCtx.DataState.NO_DUMP);
|
||||||
updateNonSequentialWriteInMemory(-count);
|
updateNonSequentialWriteInMemory(-count);
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("After writing " + handle.getFileId() + " at offset "
|
||||||
|
+ offset + ", updated the memory count, new value:"
|
||||||
|
+ nonSequentialWriteInMemory.get());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -794,6 +838,11 @@ private void doSingleWrite(final WriteCtx writeCtx) {
|
|||||||
if (!writeCtx.getReplied()) {
|
if (!writeCtx.getReplied()) {
|
||||||
WccAttr preOpAttr = latestAttr.getWccAttr();
|
WccAttr preOpAttr = latestAttr.getWccAttr();
|
||||||
WccData fileWcc = new WccData(preOpAttr, latestAttr);
|
WccData fileWcc = new WccData(preOpAttr, latestAttr);
|
||||||
|
if (writeCtx.getOriginalCount() != WriteCtx.INVALID_ORIGINAL_COUNT) {
|
||||||
|
LOG.warn("Return original count:" + writeCtx.getOriginalCount()
|
||||||
|
+ " instead of real data count:" + count);
|
||||||
|
count = writeCtx.getOriginalCount();
|
||||||
|
}
|
||||||
WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
|
WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
|
||||||
fileWcc, count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
|
fileWcc, count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
|
||||||
Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse(
|
Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse(
|
||||||
@ -801,7 +850,7 @@ private void doSingleWrite(final WriteCtx writeCtx) {
|
|||||||
}
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.error("Error writing to fileId " + handle.getFileId() + " at offset "
|
LOG.error("Error writing to fileId " + handle.getFileId() + " at offset "
|
||||||
+ offset + " and length " + data.length, e);
|
+ offset + " and length " + count, e);
|
||||||
if (!writeCtx.getReplied()) {
|
if (!writeCtx.getReplied()) {
|
||||||
WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO);
|
WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO);
|
||||||
Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse(
|
Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse(
|
||||||
|
@ -20,13 +20,16 @@
|
|||||||
import java.io.FileOutputStream;
|
import java.io.FileOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.RandomAccessFile;
|
import java.io.RandomAccessFile;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
|
||||||
import org.apache.hadoop.nfs.nfs3.FileHandle;
|
import org.apache.hadoop.nfs.nfs3.FileHandle;
|
||||||
import org.apache.hadoop.nfs.nfs3.Nfs3Constant.WriteStableHow;
|
import org.apache.hadoop.nfs.nfs3.Nfs3Constant.WriteStableHow;
|
||||||
import org.jboss.netty.channel.Channel;
|
import org.jboss.netty.channel.Channel;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -50,8 +53,17 @@ public static enum DataState {
|
|||||||
private final FileHandle handle;
|
private final FileHandle handle;
|
||||||
private final long offset;
|
private final long offset;
|
||||||
private final int count;
|
private final int count;
|
||||||
|
|
||||||
|
//Only needed for overlapped write, referring OpenFileCtx.addWritesToCache()
|
||||||
|
private final int originalCount;
|
||||||
|
public static final int INVALID_ORIGINAL_COUNT = -1;
|
||||||
|
|
||||||
|
public int getOriginalCount() {
|
||||||
|
return originalCount;
|
||||||
|
}
|
||||||
|
|
||||||
private final WriteStableHow stableHow;
|
private final WriteStableHow stableHow;
|
||||||
private volatile byte[] data;
|
private volatile ByteBuffer data;
|
||||||
|
|
||||||
private final Channel channel;
|
private final Channel channel;
|
||||||
private final int xid;
|
private final int xid;
|
||||||
@ -89,9 +101,13 @@ long dumpData(FileOutputStream dumpOut, RandomAccessFile raf)
|
|||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Resized write should not allow dump
|
||||||
|
Preconditions.checkState(originalCount == INVALID_ORIGINAL_COUNT);
|
||||||
|
|
||||||
this.raf = raf;
|
this.raf = raf;
|
||||||
dumpFileOffset = dumpOut.getChannel().position();
|
dumpFileOffset = dumpOut.getChannel().position();
|
||||||
dumpOut.write(data, 0, count);
|
dumpOut.write(data.array(), 0, count);
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("After dump, new dumpFileOffset:" + dumpFileOffset);
|
LOG.debug("After dump, new dumpFileOffset:" + dumpFileOffset);
|
||||||
}
|
}
|
||||||
@ -127,7 +143,8 @@ WriteStableHow getStableHow() {
|
|||||||
return stableHow;
|
return stableHow;
|
||||||
}
|
}
|
||||||
|
|
||||||
byte[] getData() throws IOException {
|
@VisibleForTesting
|
||||||
|
ByteBuffer getData() throws IOException {
|
||||||
if (dataState != DataState.DUMPED) {
|
if (dataState != DataState.DUMPED) {
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
if (dataState != DataState.DUMPED) {
|
if (dataState != DataState.DUMPED) {
|
||||||
@ -143,13 +160,43 @@ byte[] getData() throws IOException {
|
|||||||
|
|
||||||
private void loadData() throws IOException {
|
private void loadData() throws IOException {
|
||||||
Preconditions.checkState(data == null);
|
Preconditions.checkState(data == null);
|
||||||
data = new byte[count];
|
byte[] rawData = new byte[count];
|
||||||
raf.seek(dumpFileOffset);
|
raf.seek(dumpFileOffset);
|
||||||
int size = raf.read(data, 0, count);
|
int size = raf.read(rawData, 0, count);
|
||||||
if (size != count) {
|
if (size != count) {
|
||||||
throw new IOException("Data count is " + count + ", but read back "
|
throw new IOException("Data count is " + count + ", but read back "
|
||||||
+ size + "bytes");
|
+ size + "bytes");
|
||||||
}
|
}
|
||||||
|
data = ByteBuffer.wrap(rawData);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void writeData(HdfsDataOutputStream fos) throws IOException {
|
||||||
|
Preconditions.checkState(fos != null);
|
||||||
|
|
||||||
|
ByteBuffer dataBuffer = null;
|
||||||
|
try {
|
||||||
|
dataBuffer = getData();
|
||||||
|
} catch (Exception e1) {
|
||||||
|
LOG.error("Failed to get request data offset:" + offset + " count:"
|
||||||
|
+ count + " error:" + e1);
|
||||||
|
throw new IOException("Can't get WriteCtx.data");
|
||||||
|
}
|
||||||
|
|
||||||
|
byte[] data = dataBuffer.array();
|
||||||
|
int position = dataBuffer.position();
|
||||||
|
int limit = dataBuffer.limit();
|
||||||
|
Preconditions.checkState(limit - position == count);
|
||||||
|
// Modified write has a valid original count
|
||||||
|
if (position != 0) {
|
||||||
|
if (limit != getOriginalCount()) {
|
||||||
|
throw new IOException("Modified write has differnt original size."
|
||||||
|
+ "buff position:" + position + " buff limit:" + limit + ". "
|
||||||
|
+ toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now write data
|
||||||
|
fos.write(data, position, count);
|
||||||
}
|
}
|
||||||
|
|
||||||
Channel getChannel() {
|
Channel getChannel() {
|
||||||
@ -168,11 +215,13 @@ void setReplied(boolean replied) {
|
|||||||
this.replied = replied;
|
this.replied = replied;
|
||||||
}
|
}
|
||||||
|
|
||||||
WriteCtx(FileHandle handle, long offset, int count, WriteStableHow stableHow,
|
WriteCtx(FileHandle handle, long offset, int count, int originalCount,
|
||||||
byte[] data, Channel channel, int xid, boolean replied, DataState dataState) {
|
WriteStableHow stableHow, ByteBuffer data, Channel channel, int xid,
|
||||||
|
boolean replied, DataState dataState) {
|
||||||
this.handle = handle;
|
this.handle = handle;
|
||||||
this.offset = offset;
|
this.offset = offset;
|
||||||
this.count = count;
|
this.count = count;
|
||||||
|
this.originalCount = originalCount;
|
||||||
this.stableHow = stableHow;
|
this.stableHow = stableHow;
|
||||||
this.data = data;
|
this.data = data;
|
||||||
this.channel = channel;
|
this.channel = channel;
|
||||||
@ -185,7 +234,7 @@ void setReplied(boolean replied) {
|
|||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "Id:" + handle.getFileId() + " offset:" + offset + " count:" + count
|
return "Id:" + handle.getFileId() + " offset:" + offset + " count:" + count
|
||||||
+ " stableHow:" + stableHow + " replied:" + replied + " dataState:"
|
+ " originalCount:" + originalCount + " stableHow:" + stableHow
|
||||||
+ dataState + " xid:" + xid;
|
+ " replied:" + replied + " dataState:" + dataState + " xid:" + xid;
|
||||||
}
|
}
|
||||||
}
|
}
|
@ -0,0 +1,100 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.nfs.nfs3;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
|
||||||
|
import junit.framework.Assert;
|
||||||
|
|
||||||
|
import org.apache.hadoop.nfs.nfs3.FileHandle;
|
||||||
|
import org.apache.hadoop.nfs.nfs3.Nfs3Constant.WriteStableHow;
|
||||||
|
import org.apache.hadoop.nfs.nfs3.request.WRITE3Request;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class TestWrites {
|
||||||
|
@Test
|
||||||
|
public void testAlterWriteRequest() throws IOException {
|
||||||
|
int len = 20;
|
||||||
|
byte[] data = new byte[len];
|
||||||
|
ByteBuffer buffer = ByteBuffer.wrap(data);
|
||||||
|
|
||||||
|
for (int i = 0; i < len; i++) {
|
||||||
|
buffer.put((byte) i);
|
||||||
|
}
|
||||||
|
buffer.flip();
|
||||||
|
int originalCount = buffer.array().length;
|
||||||
|
WRITE3Request request = new WRITE3Request(new FileHandle(), 0, data.length,
|
||||||
|
WriteStableHow.UNSTABLE, buffer);
|
||||||
|
|
||||||
|
WriteCtx writeCtx1 = new WriteCtx(request.getHandle(), request.getOffset(),
|
||||||
|
request.getCount(), WriteCtx.INVALID_ORIGINAL_COUNT,
|
||||||
|
request.getStableHow(), request.getData(), null, 1, false,
|
||||||
|
WriteCtx.DataState.NO_DUMP);
|
||||||
|
|
||||||
|
Assert.assertTrue(writeCtx1.getData().array().length == originalCount);
|
||||||
|
|
||||||
|
// Now change the write request
|
||||||
|
OpenFileCtx.alterWriteRequest(request, 12);
|
||||||
|
|
||||||
|
WriteCtx writeCtx2 = new WriteCtx(request.getHandle(), request.getOffset(),
|
||||||
|
request.getCount(), originalCount, request.getStableHow(),
|
||||||
|
request.getData(), null, 2, false, WriteCtx.DataState.NO_DUMP);
|
||||||
|
ByteBuffer appendedData = writeCtx2.getData();
|
||||||
|
|
||||||
|
int position = appendedData.position();
|
||||||
|
int limit = appendedData.limit();
|
||||||
|
Assert.assertTrue(position == 12);
|
||||||
|
Assert.assertTrue(limit - position == 8);
|
||||||
|
Assert.assertTrue(appendedData.get(position) == (byte) 12);
|
||||||
|
Assert.assertTrue(appendedData.get(position + 1) == (byte) 13);
|
||||||
|
Assert.assertTrue(appendedData.get(position + 2) == (byte) 14);
|
||||||
|
Assert.assertTrue(appendedData.get(position + 7) == (byte) 19);
|
||||||
|
|
||||||
|
// Test current file write offset is at boundaries
|
||||||
|
buffer.position(0);
|
||||||
|
request = new WRITE3Request(new FileHandle(), 0, data.length,
|
||||||
|
WriteStableHow.UNSTABLE, buffer);
|
||||||
|
OpenFileCtx.alterWriteRequest(request, 1);
|
||||||
|
WriteCtx writeCtx3 = new WriteCtx(request.getHandle(), request.getOffset(),
|
||||||
|
request.getCount(), originalCount, request.getStableHow(),
|
||||||
|
request.getData(), null, 2, false, WriteCtx.DataState.NO_DUMP);
|
||||||
|
appendedData = writeCtx3.getData();
|
||||||
|
position = appendedData.position();
|
||||||
|
limit = appendedData.limit();
|
||||||
|
Assert.assertTrue(position == 1);
|
||||||
|
Assert.assertTrue(limit - position == 19);
|
||||||
|
Assert.assertTrue(appendedData.get(position) == (byte) 1);
|
||||||
|
Assert.assertTrue(appendedData.get(position + 18) == (byte) 19);
|
||||||
|
|
||||||
|
// Reset buffer position before test another boundary
|
||||||
|
buffer.position(0);
|
||||||
|
request = new WRITE3Request(new FileHandle(), 0, data.length,
|
||||||
|
WriteStableHow.UNSTABLE, buffer);
|
||||||
|
OpenFileCtx.alterWriteRequest(request, 19);
|
||||||
|
WriteCtx writeCtx4 = new WriteCtx(request.getHandle(), request.getOffset(),
|
||||||
|
request.getCount(), originalCount, request.getStableHow(),
|
||||||
|
request.getData(), null, 2, false, WriteCtx.DataState.NO_DUMP);
|
||||||
|
appendedData = writeCtx4.getData();
|
||||||
|
position = appendedData.position();
|
||||||
|
limit = appendedData.limit();
|
||||||
|
Assert.assertTrue(position == 19);
|
||||||
|
Assert.assertTrue(limit - position == 1);
|
||||||
|
Assert.assertTrue(appendedData.get(position) == (byte) 19);
|
||||||
|
}
|
||||||
|
}
|
@ -403,6 +403,9 @@ Release 2.1.2 - UNRELEASED
|
|||||||
HDFS-5299. DFS client hangs in updatePipeline RPC when failover happened.
|
HDFS-5299. DFS client hangs in updatePipeline RPC when failover happened.
|
||||||
(Vinay via jing9)
|
(Vinay via jing9)
|
||||||
|
|
||||||
|
HDFS-5259. Support client which combines appended data with old data
|
||||||
|
before sends it to NFS server. (brandonli)
|
||||||
|
|
||||||
Release 2.1.1-beta - 2013-09-23
|
Release 2.1.1-beta - 2013-09-23
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
Loading…
Reference in New Issue
Block a user