HDFS-5078 Support file append in NFSv3 gateway to enable data streaming to HDFS. Contributed by Brandon Li
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1518292 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
53f559dd79
commit
30b8ef91a3
@ -201,4 +201,8 @@ public static WriteStableHow fromValue(int id) {
|
||||
public static final String EXPORTS_CACHE_EXPIRYTIME_MILLIS_KEY = "hdfs.nfs.exports.cache.expirytime.millis";
|
||||
public static final long EXPORTS_CACHE_EXPIRYTIME_MILLIS_DEFAULT = 15 * 60 * 1000; // 15 min
|
||||
|
||||
public static final String FILE_DUMP_DIR_KEY = "dfs.nfs3.dump.dir";
|
||||
public static final String FILE_DUMP_DIR_DEFAULT = "/tmp/.hdfs-nfs";
|
||||
public static final String ENABLE_FILE_DUMP_KEY = "dfs.nfs3.enableDump";
|
||||
public static final boolean ENABLE_FILE_DUMP_DEFAULT = true;
|
||||
}
|
||||
|
@ -126,6 +126,9 @@ private long updateNonSequentialWriteInMemory(long count) {
|
||||
nonSequentialWriteInMemory = 0;
|
||||
this.dumpFilePath = dumpFilePath;
|
||||
enabledDump = dumpFilePath == null ? false: true;
|
||||
nextOffset = latestAttr.getSize();
|
||||
assert(nextOffset == this.fos.getPos());
|
||||
|
||||
ctxLock = new ReentrantLock(true);
|
||||
}
|
||||
|
||||
@ -686,11 +689,13 @@ private void doSingleWrite(final WriteCtx writeCtx) {
|
||||
try {
|
||||
fos.write(data, 0, count);
|
||||
|
||||
if (fos.getPos() != (offset + count)) {
|
||||
long flushedOffset = getFlushedOffset();
|
||||
if (flushedOffset != (offset + count)) {
|
||||
throw new IOException("output stream is out of sync, pos="
|
||||
+ fos.getPos() + " and nextOffset should be" + (offset + count));
|
||||
+ flushedOffset + " and nextOffset should be"
|
||||
+ (offset + count));
|
||||
}
|
||||
nextOffset = fos.getPos();
|
||||
nextOffset = flushedOffset;
|
||||
|
||||
// Reduce memory occupation size if request was allowed dumped
|
||||
if (writeCtx.getDataState() == DataState.ALLOW_DUMP) {
|
||||
|
@ -29,6 +29,7 @@
|
||||
import org.apache.hadoop.fs.CreateFlag;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FileSystem.Statistics;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.fs.FsStatus;
|
||||
import org.apache.hadoop.fs.Options;
|
||||
@ -123,7 +124,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
|
||||
|
||||
private final Configuration config = new Configuration();
|
||||
private final WriteManager writeManager;
|
||||
private final IdUserGroup iug;// = new IdUserGroup();
|
||||
private final IdUserGroup iug;
|
||||
private final DFSClientCache clientCache;
|
||||
|
||||
private final NfsExports exports;
|
||||
@ -161,10 +162,14 @@ public RpcProgramNfs3(Configuration config)
|
||||
DFSConfigKeys.DFS_REPLICATION_DEFAULT);
|
||||
blockSize = config.getLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY,
|
||||
DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT);
|
||||
bufferSize = config.getInt("io.file.buffer.size", 4096);
|
||||
bufferSize = config.getInt(
|
||||
CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
|
||||
CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);
|
||||
|
||||
writeDumpDir = config.get("dfs.nfs3.dump.dir", "/tmp/.hdfs-nfs");
|
||||
boolean enableDump = config.getBoolean("dfs.nfs3.enableDump", true);
|
||||
writeDumpDir = config.get(Nfs3Constant.FILE_DUMP_DIR_KEY,
|
||||
Nfs3Constant.FILE_DUMP_DIR_DEFAULT);
|
||||
boolean enableDump = config.getBoolean(Nfs3Constant.ENABLE_FILE_DUMP_KEY,
|
||||
Nfs3Constant.ENABLE_FILE_DUMP_DEFAULT);
|
||||
if (!enableDump) {
|
||||
writeDumpDir = null;
|
||||
} else {
|
||||
@ -1112,6 +1117,7 @@ public RENAME3Response rename(XDR xdr, RpcAuthSys authSys, InetAddress client) {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public SYMLINK3Response symlink(XDR xdr, RpcAuthSys authSys,
|
||||
InetAddress client) {
|
||||
return new SYMLINK3Response(Nfs3Status.NFS3ERR_NOTSUPP);
|
||||
|
@ -25,7 +25,9 @@
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.hdfs.DFSClient;
|
||||
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
|
||||
import org.apache.hadoop.nfs.NfsFileType;
|
||||
import org.apache.hadoop.nfs.nfs3.FileHandle;
|
||||
import org.apache.hadoop.nfs.nfs3.IdUserGroup;
|
||||
@ -48,6 +50,7 @@
|
||||
public class WriteManager {
|
||||
public static final Log LOG = LogFactory.getLog(WriteManager.class);
|
||||
|
||||
private final Configuration config;
|
||||
private final IdUserGroup iug;
|
||||
private final ConcurrentMap<FileHandle, OpenFileCtx> openFileMap = Maps
|
||||
.newConcurrentMap();
|
||||
@ -76,6 +79,7 @@ void addOpenFileStream(FileHandle h, OpenFileCtx ctx) {
|
||||
|
||||
WriteManager(IdUserGroup iug, final Configuration config) {
|
||||
this.iug = iug;
|
||||
this.config = config;
|
||||
|
||||
streamTimeout = config.getLong("dfs.nfs3.stream.timeout",
|
||||
DEFAULT_STREAM_TIMEOUT);
|
||||
@ -129,7 +133,25 @@ void handleWrite(DFSClient dfsClient, WRITE3Request request, Channel channel,
|
||||
OpenFileCtx openFileCtx = openFileMap.get(fileHandle);
|
||||
if (openFileCtx == null) {
|
||||
LOG.info("No opened stream for fileId:" + fileHandle.getFileId());
|
||||
WccData fileWcc = new WccData(Nfs3Utils.getWccAttr(preOpAttr), preOpAttr);
|
||||
|
||||
String fileIdPath = Nfs3Utils.getFileIdPath(fileHandle.getFileId());
|
||||
HdfsDataOutputStream fos = null;
|
||||
Nfs3FileAttributes latestAttr = null;
|
||||
try {
|
||||
int bufferSize = config.getInt(
|
||||
CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
|
||||
CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);
|
||||
|
||||
fos = dfsClient.append(fileIdPath, bufferSize, null, null);
|
||||
|
||||
latestAttr = Nfs3Utils.getFileAttr(dfsClient, fileIdPath, iug);
|
||||
} catch (IOException e) {
|
||||
LOG.error("Can't apapend to file:" + fileIdPath + ", error:" + e);
|
||||
if (fos != null) {
|
||||
fos.close();
|
||||
}
|
||||
WccData fileWcc = new WccData(Nfs3Utils.getWccAttr(preOpAttr),
|
||||
preOpAttr);
|
||||
WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO,
|
||||
fileWcc, count, request.getStableHow(),
|
||||
Nfs3Constant.WRITE_COMMIT_VERF);
|
||||
@ -137,6 +159,17 @@ void handleWrite(DFSClient dfsClient, WRITE3Request request, Channel channel,
|
||||
return;
|
||||
}
|
||||
|
||||
// Add open stream
|
||||
String writeDumpDir = config.get(Nfs3Constant.FILE_DUMP_DIR_KEY,
|
||||
Nfs3Constant.FILE_DUMP_DIR_DEFAULT);
|
||||
openFileCtx = new OpenFileCtx(fos, latestAttr, writeDumpDir + "/"
|
||||
+ fileHandle.getFileId());
|
||||
addOpenFileStream(fileHandle, openFileCtx);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("opened stream for file:" + fileHandle.getFileId());
|
||||
}
|
||||
}
|
||||
|
||||
// Add write into the async job queue
|
||||
openFileCtx.receivedNewWrite(dfsClient, request, channel, xid,
|
||||
asyncDataService, iug);
|
||||
|
@ -307,6 +307,9 @@ Release 2.1.1-beta - UNRELEASED
|
||||
HDFS-4947 Add NFS server export table to control export by hostname or
|
||||
IP range (Jing Zhao via brandonli)
|
||||
|
||||
HDFS-5078 Support file append in NFSv3 gateway to enable data streaming
|
||||
to HDFS (brandonli)
|
||||
|
||||
IMPROVEMENTS
|
||||
|
||||
HDFS-4513. Clarify in the WebHDFS REST API that all JSON respsonses may
|
||||
|
Loading…
Reference in New Issue
Block a user