HDFS-4912. Cleanup FSNamesystem#startFileInternal. Contributed by Suresh Srinivas.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1502634 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Suresh Srinivas 2013-07-12 17:44:00 +00:00
parent 3630a23c55
commit b47df4e3b9
4 changed files with 94 additions and 78 deletions

View File

@ -435,6 +435,8 @@ Release 2.1.0-beta - 2013-07-02
HDFS-4645. Move from randomly generated block ID to sequentially generated
block ID. (Arpit Agarwal via szetszwo)
HDFS-4912. Cleanup FSNamesystem#startFileInternal. (suresh)
OPTIMIZATIONS
HDFS-4465. Optimize datanode ReplicasMap and ReplicaInfo. (atm)

View File

@ -1892,6 +1892,7 @@ private HdfsFileStatus startFileInt(String src, PermissionStatus permissions,
if (!DFSUtil.isValidName(src)) {
throw new InvalidPathException(src);
}
blockManager.verifyReplication(src, replication, clientMachine);
boolean skipSync = false;
final HdfsFileStatus stat;
@ -1903,6 +1904,8 @@ private HdfsFileStatus startFileInt(String src, PermissionStatus permissions,
+ "): " + blockSize + " < " + minBlockSize);
}
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
boolean create = flag.contains(CreateFlag.CREATE);
boolean overwrite = flag.contains(CreateFlag.OVERWRITE);
writeLock();
try {
checkOperation(OperationCategory.WRITE);
@ -1910,8 +1913,8 @@ private HdfsFileStatus startFileInt(String src, PermissionStatus permissions,
throw new SafeModeException("Cannot create file" + src, safeMode);
}
src = FSDirectory.resolvePath(src, pathComponents, dir);
startFileInternal(pc, src, permissions, holder, clientMachine, flag,
createParent, replication, blockSize);
startFileInternal(pc, src, permissions, holder, clientMachine,
create, overwrite, createParent, replication, blockSize);
stat = dir.getFileInfo(src, false);
} catch (StandbyException se) {
skipSync = true;
@ -1930,25 +1933,18 @@ private HdfsFileStatus startFileInt(String src, PermissionStatus permissions,
}
/**
* Create new or open an existing file for append.<p>
* Create a new file or overwrite an existing file<br>
*
* In case of opening the file for append, the method returns the last
* block of the file if this is a partial block, which can still be used
* for writing more data. The client uses the returned block locations
* to form the data pipeline for this block.<br>
* The method returns null if the last block is full or if this is a
* new file. The client then allocates a new block with the next call
* using {@link NameNode#addBlock()}.<p>
*
* For description of parameters and exceptions thrown see
* Once the file is create the client then allocates a new block with the next
* call using {@link NameNode#addBlock()}.
* <p>
* For description of parameters and exceptions thrown see
* {@link ClientProtocol#create()}
*
* @return the last block locations if the block is partial or null otherwise
*/
private LocatedBlock startFileInternal(FSPermissionChecker pc, String src,
private void startFileInternal(FSPermissionChecker pc, String src,
PermissionStatus permissions, String holder, String clientMachine,
EnumSet<CreateFlag> flag, boolean createParent, short replication,
long blockSize) throws SafeModeException, FileAlreadyExistsException,
boolean create, boolean overwrite, boolean createParent,
short replication, long blockSize) throws FileAlreadyExistsException,
AccessControlException, UnresolvedLinkException, FileNotFoundException,
ParentNotDirectoryException, IOException {
assert hasWriteLock();
@ -1960,11 +1956,8 @@ private LocatedBlock startFileInternal(FSPermissionChecker pc, String src,
+ "; already exists as a directory.");
}
final INodeFile myFile = INodeFile.valueOf(inode, src, true);
boolean overwrite = flag.contains(CreateFlag.OVERWRITE);
boolean append = flag.contains(CreateFlag.APPEND);
if (isPermissionEnabled) {
if (append || (overwrite && myFile != null)) {
if (overwrite && myFile != null) {
checkPathAccess(pc, src, FsAction.WRITE);
} else {
checkAncestorAccess(pc, src, FsAction.WRITE);
@ -1976,65 +1969,95 @@ private LocatedBlock startFileInternal(FSPermissionChecker pc, String src,
}
try {
blockManager.verifyReplication(src, replication, clientMachine);
boolean create = flag.contains(CreateFlag.CREATE);
if (myFile == null) {
if (!create) {
throw new FileNotFoundException("failed to overwrite or append to non-existent file "
throw new FileNotFoundException("failed to overwrite non-existent file "
+ src + " on client " + clientMachine);
}
} else {
// File exists - must be one of append or overwrite
if (overwrite) {
delete(src, true);
delete(src, true); // File exists - delete if overwrite
} else {
// Opening an existing file for write - may need to recover lease.
// If lease soft limit time is expired, recover the lease
recoverLeaseInternal(myFile, src, holder, clientMachine, false);
if (!append) {
throw new FileAlreadyExistsException("failed to create file " + src
+ " on client " + clientMachine
+ " because the file exists");
}
throw new FileAlreadyExistsException("failed to create file " + src
+ " on client " + clientMachine + " because the file exists");
}
}
checkFsObjectLimit();
final DatanodeDescriptor clientNode =
blockManager.getDatanodeManager().getDatanodeByHost(clientMachine);
if (append && myFile != null) {
final INodeFile f = INodeFile.valueOf(myFile, src);
return prepareFileForWrite(src, f, holder, clientMachine, clientNode,
true, iip.getLatestSnapshot());
} else {
// Now we can add the name to the filesystem. This file has no
// blocks associated with it.
//
checkFsObjectLimit();
INodeFileUnderConstruction newNode = dir.addFile(src, permissions,
replication, blockSize, holder, clientMachine, clientNode);
if (newNode == null) {
throw new IOException("DIR* NameSystem.startFile: " +
"Unable to add file to namespace.");
}
leaseManager.addLease(newNode.getClientName(), src);
// increment global generation stamp
INodeFileUnderConstruction newNode = dir.addFile(src, permissions,
replication, blockSize, holder, clientMachine, clientNode);
if (newNode == null) {
throw new IOException("DIR* NameSystem.startFile: " +
"Unable to add file to namespace.");
}
leaseManager.addLease(newNode.getClientName(), src);
// record file record in log, record new generation stamp
getEditLog().logOpenFile(src, newNode);
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: "
+"add "+src+" to namespace for "+holder);
}
// record file record in log, record new generation stamp
getEditLog().logOpenFile(src, newNode);
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: "
+"add "+src+" to namespace for "+holder);
}
} catch (IOException ie) {
NameNode.stateChangeLog.warn("DIR* NameSystem.startFile: "
+ie.getMessage());
throw ie;
}
return null;
}
/**
* Append to an existing file for append.
* <p>
*
* The method returns the last block of the file if this is a partial block,
* which can still be used for writing more data. The client uses the returned
* block locations to form the data pipeline for this block.<br>
* The method returns null if the last block is full. The client then
* allocates a new block with the next call using {@link NameNode#addBlock()}.
* <p>
*
* For description of parameters and exceptions thrown see
* {@link ClientProtocol#append(String, String)}
*
* @return the last block locations if the block is partial or null otherwise
*/
private LocatedBlock appendFileInternal(FSPermissionChecker pc, String src,
String holder, String clientMachine) throws AccessControlException,
UnresolvedLinkException, FileNotFoundException, IOException {
assert hasWriteLock();
// Verify that the destination does not exist as a directory already.
final INodesInPath iip = dir.getINodesInPath4Write(src);
final INode inode = iip.getLastINode();
if (inode != null && inode.isDirectory()) {
throw new FileAlreadyExistsException("Cannot append to directory " + src
+ "; already exists as a directory.");
}
if (isPermissionEnabled) {
checkPathAccess(pc, src, FsAction.WRITE);
}
try {
if (inode == null) {
throw new FileNotFoundException("failed to append to non-existent file "
+ src + " on client " + clientMachine);
}
final INodeFile myFile = INodeFile.valueOf(inode, src, true);
// Opening an existing file for write - may need to recover lease.
recoverLeaseInternal(myFile, src, holder, clientMachine, false);
final DatanodeDescriptor clientNode =
blockManager.getDatanodeManager().getDatanodeByHost(clientMachine);
return prepareFileForWrite(src, myFile, holder, clientMachine, clientNode,
true, iip.getLatestSnapshot());
} catch (IOException ie) {
NameNode.stateChangeLog.warn("DIR* NameSystem.append: " +ie.getMessage());
throw ie;
}
}
/**
@ -2225,14 +2248,6 @@ private LocatedBlock appendFileInt(String src, String holder, String clientMachi
"Append is not enabled on this NameNode. Use the " +
DFS_SUPPORT_APPEND_KEY + " configuration option to enable it.");
}
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.appendFile: src=" + src
+ ", holder=" + holder
+ ", clientMachine=" + clientMachine);
}
if (!DFSUtil.isValidName(src)) {
throw new InvalidPathException(src);
}
LocatedBlock lb = null;
FSPermissionChecker pc = getPermissionChecker();
@ -2245,9 +2260,7 @@ private LocatedBlock appendFileInt(String src, String holder, String clientMachi
throw new SafeModeException("Cannot append to file" + src, safeMode);
}
src = FSDirectory.resolvePath(src, pathComponents, dir);
lb = startFileInternal(pc, src, null, holder, clientMachine,
EnumSet.of(CreateFlag.APPEND),
false, blockManager.maxReplication, 0);
lb = appendFileInternal(pc, src, holder, clientMachine);
} catch (StandbyException se) {
skipSync = true;
throw se;

View File

@ -60,6 +60,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@ -502,7 +503,7 @@ public void testFileCreationError2() throws IOException {
DistributedFileSystem dfs = null;
try {
cluster.waitActive();
dfs = (DistributedFileSystem)cluster.getFileSystem();
dfs = cluster.getFileSystem();
DFSClient client = dfs.dfs;
// create a new file.
@ -562,7 +563,7 @@ public void testFileCreationError3() throws IOException {
DistributedFileSystem dfs = null;
try {
cluster.waitActive();
dfs = (DistributedFileSystem)cluster.getFileSystem();
dfs = cluster.getFileSystem();
DFSClient client = dfs.dfs;
// create a new file.
@ -703,7 +704,7 @@ public void testFileCreationNamenodeRestart() throws IOException {
stm4.close();
// verify that new block is associated with this file
DFSClient client = ((DistributedFileSystem)fs).dfs;
DFSClient client = fs.dfs;
LocatedBlocks locations = client.getNamenode().getBlockLocations(
file1.toString(), 0, Long.MAX_VALUE);
System.out.println("locations = " + locations.locatedBlockCount());
@ -951,7 +952,7 @@ public void testLeaseExpireHardLimit() throws Exception {
DistributedFileSystem dfs = null;
try {
cluster.waitActive();
dfs = (DistributedFileSystem)cluster.getFileSystem();
dfs = cluster.getFileSystem();
// create a new file.
final String f = DIR + "foo";
@ -1012,7 +1013,7 @@ public void testFsClose() throws Exception {
DistributedFileSystem dfs = null;
try {
cluster.waitActive();
dfs = (DistributedFileSystem)cluster.getFileSystem();
dfs = cluster.getFileSystem();
// create a new file.
final String f = DIR + "foofs";
@ -1044,7 +1045,7 @@ public void testFsCloseAfterClusterShutdown() throws IOException {
DistributedFileSystem dfs = null;
try {
cluster.waitActive();
dfs = (DistributedFileSystem)cluster.getFileSystem();
dfs = cluster.getFileSystem();
// create a new file.
final String f = DIR + "testFsCloseAfterClusterShutdown";
@ -1177,7 +1178,7 @@ public void testFileIdMismatch() throws IOException {
DistributedFileSystem dfs = null;
try {
cluster.waitActive();
dfs = (DistributedFileSystem)cluster.getFileSystem();
dfs = cluster.getFileSystem();
DFSClient client = dfs.dfs;
final Path f = new Path("/testFileIdMismatch.txt");

View File

@ -130,7 +130,7 @@ public void testImmediateRecoveryOfLease() throws Exception {
size = AppendTestUtil.nextInt(FILE_SIZE);
filepath = createFile("/immediateRecoverLease-longlease", size, false);
// test recoverLese from a different client
// test recoverLease from a different client
recoverLease(filepath, null);
verifyFile(dfs, filepath, actual, size);