HDFS-7415. Move FSNameSystem.resolvePath() to FSDirectory. Contributed by Haohui Mai.

This commit is contained in:
Haohui Mai 2014-11-20 19:21:23 -08:00
parent 90194ca1cb
commit c95b878abf
3 changed files with 74 additions and 60 deletions

View File

@ -381,6 +381,8 @@ Release 2.7.0 - UNRELEASED
HDFS-7413. Some unit tests should use NameNodeProtocols instead of
FSNameSystem. (wheat9)
HDFS-7415. Move FSNameSystem.resolvePath() to FSDirectory. (wheat9)
OPTIMIZATIONS
BUG FIXES

View File

@ -151,6 +151,8 @@ private static INodeDirectory createRoot(FSNamesystem namesystem) {
// lock to protect the directory and BlockMap
private final ReentrantReadWriteLock dirLock;
private final boolean isPermissionEnabled;
// utility methods to acquire and release read lock and write lock
void readLock() {
this.dirLock.readLock().lock();
@ -197,6 +199,9 @@ public int getWriteHoldCount() {
this.dirLock = new ReentrantReadWriteLock(true); // fair
rootDir = createRoot(ns);
inodeMap = INodeMap.newInstance(rootDir);
this.isPermissionEnabled = conf.getBoolean(
DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY,
DFSConfigKeys.DFS_PERMISSIONS_ENABLED_DEFAULT);
int configuredLimit = conf.getInt(
DFSConfigKeys.DFS_LIST_LIMIT, DFSConfigKeys.DFS_LIST_LIMIT_DEFAULT);
this.lsLimit = configuredLimit>0 ?
@ -838,6 +843,29 @@ private static void validateRenameSource(String src, INodesInPath srcIIP)
checkSnapshot(srcInode, null);
}
/**
* This is a wrapper for resolvePath(). If the path passed
* is prefixed with /.reserved/raw, then it checks to ensure that the caller
* has super user has super user privileges.
*
* @param pc The permission checker used when resolving path.
* @param path The path to resolve.
* @param pathComponents path components corresponding to the path
* @return if the path indicates an inode, return path after replacing up to
* <inodeid> with the corresponding path of the inode, else the path
* in {@code src} as is. If the path refers to a path in the "raw"
* directory, return the non-raw pathname.
* @throws FileNotFoundException
* @throws AccessControlException
*/
String resolvePath(FSPermissionChecker pc, String path, byte[][] pathComponents)
throws FileNotFoundException, AccessControlException {
if (isReservedRawName(path) && isPermissionEnabled) {
pc.checkSuperuserPrivilege();
}
return resolvePath(path, pathComponents, this);
}
private class RenameOperation {
private final INodesInPath srcIIP;
private final INodesInPath dstIIP;

View File

@ -1706,7 +1706,7 @@ private void setPermissionInt(final String srcArg, FsPermission permission)
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot set permission for " + src);
src = resolvePath(src, pathComponents);
src = dir.resolvePath(pc, src, pathComponents);
checkOwner(pc, src);
dir.setPermission(src, permission);
getEditLog().logSetPermissions(src, permission);
@ -1745,7 +1745,7 @@ private void setOwnerInt(final String srcArg, String username, String group)
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot set owner for " + src);
src = resolvePath(src, pathComponents);
src = dir.resolvePath(pc, src, pathComponents);
checkOwner(pc, src);
if (!pc.isSuperUser()) {
if (username != null && !pc.getUser().equals(username)) {
@ -1862,7 +1862,7 @@ private LocatedBlocks getBlockLocationsUpdateTimes(final String srcArg,
writeLock(); // writelock is needed to set accesstime
}
try {
src = resolvePath(src, pathComponents);
src = dir.resolvePath(pc, src, pathComponents);
if (isReadOp) {
checkOperation(OperationCategory.READ);
} else {
@ -2155,7 +2155,7 @@ private void setTimesInt(final String srcArg, long mtime, long atime)
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot set times " + src);
src = resolvePath(src, pathComponents);
src = dir.resolvePath(pc, src, pathComponents);
// Write access is required to set access and modification times
if (isPermissionEnabled) {
@ -2223,7 +2223,7 @@ private void createSymlinkInt(String target, final String linkArg,
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot create symlink " + link);
link = resolvePath(link, pathComponents);
link = dir.resolvePath(pc, link, pathComponents);
if (!createParent) {
verifyParentDir(link);
}
@ -2283,7 +2283,7 @@ private boolean setReplicationInt(final String srcArg,
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot set replication for " + src);
src = resolvePath(src, pathComponents);
src = dir.resolvePath(pc, src, pathComponents);
if (isPermissionEnabled) {
checkPathAccess(pc, src, FsAction.WRITE);
}
@ -2390,7 +2390,7 @@ long getPreferredBlockSize(String filename)
readLock();
try {
checkOperation(OperationCategory.READ);
filename = resolvePath(filename, pathComponents);
filename = dir.resolvePath(pc, filename, pathComponents);
if (isPermissionEnabled) {
checkTraverse(pc, filename);
}
@ -2582,7 +2582,7 @@ private HdfsFileStatus startFileInt(final String srcArg,
if (provider != null) {
readLock();
try {
src = resolvePath(src, pathComponents);
src = dir.resolvePath(pc, src, pathComponents);
INodesInPath iip = dir.getINodesInPath4Write(src);
// Nothing to do if the path is not within an EZ
final EncryptionZone zone = dir.getEZForPath(iip);
@ -2618,7 +2618,7 @@ private HdfsFileStatus startFileInt(final String srcArg,
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot create file" + src);
src = resolvePath(src, pathComponents);
src = dir.resolvePath(pc, src, pathComponents);
toRemoveBlocks = startFileInternal(pc, src, permissions, holder,
clientMachine, create, overwrite, createParent, replication,
blockSize, isLazyPersist, suite, protocolVersion, edek, logRetryCache);
@ -2934,7 +2934,7 @@ boolean recoverLease(String src, String holder, String clientMachine)
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot recover the lease of " + src);
src = resolvePath(src, pathComponents);
src = dir.resolvePath(pc, src, pathComponents);
final INodeFile inode = INodeFile.valueOf(dir.getINode(src), src);
if (!inode.isUnderConstruction()) {
return true;
@ -3081,7 +3081,7 @@ private LocatedBlock appendFileInt(final String srcArg, String holder,
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot append to file" + src);
src = resolvePath(src, pathComponents);
src = dir.resolvePath(pc, src, pathComponents);
lb = appendFileInternal(pc, src, holder, clientMachine, logRetryCache);
} catch (StandbyException se) {
skipSync = true;
@ -3146,10 +3146,11 @@ LocatedBlock getAdditionalBlock(String src, long fileId, String clientName,
// Part I. Analyze the state of the file with respect to the input data.
checkOperation(OperationCategory.READ);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
FSPermissionChecker pc = getPermissionChecker();
readLock();
try {
checkOperation(OperationCategory.READ);
src = resolvePath(src, pathComponents);
src = dir.resolvePath(pc, src, pathComponents);
LocatedBlock[] onRetryBlock = new LocatedBlock[1];
FileState fileState = analyzeFileState(
src, fileId, clientName, previous, onRetryBlock);
@ -3394,12 +3395,13 @@ LocatedBlock getAdditionalDatanode(String src, long fileId,
final List<DatanodeStorageInfo> chosen;
checkOperation(OperationCategory.READ);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
FSPermissionChecker pc = getPermissionChecker();
readLock();
try {
checkOperation(OperationCategory.READ);
//check safe mode
checkNameNodeSafeMode("Cannot add datanode; src=" + src + ", blk=" + blk);
src = resolvePath(src, pathComponents);
src = dir.resolvePath(pc, src, pathComponents);
//check lease
final INode inode;
@ -3450,12 +3452,13 @@ boolean abandonBlock(ExtendedBlock b, long fileId, String src, String holder)
}
checkOperation(OperationCategory.WRITE);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
FSPermissionChecker pc = getPermissionChecker();
waitForLoadingFSImage();
writeLock();
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot abandon block " + b + " for file" + src);
src = resolvePath(src, pathComponents);
src = dir.resolvePath(pc, src, pathComponents);
final INode inode;
if (fileId == INodeId.GRANDFATHER_INODE_ID) {
@ -3549,12 +3552,13 @@ boolean completeFile(final String srcArg, String holder,
boolean success = false;
checkOperation(OperationCategory.WRITE);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
FSPermissionChecker pc = getPermissionChecker();
waitForLoadingFSImage();
writeLock();
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot complete file " + src);
src = resolvePath(src, pathComponents);
src = dir.resolvePath(pc, src, pathComponents);
success = completeFileInternal(src, holder,
ExtendedBlock.getLocalBlock(last), fileId);
} finally {
@ -3756,8 +3760,8 @@ private boolean renameToInt(final String srcArg, final String dstArg,
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot rename " + src);
waitForLoadingFSImage();
src = resolvePath(src, srcComponents);
dst = resolvePath(dst, dstComponents);
src = dir.resolvePath(pc, src, srcComponents);
dst = dir.resolvePath(pc, dst, dstComponents);
checkOperation(OperationCategory.WRITE);
status = renameToInternal(pc, src, dst, logRetryCache);
if (status) {
@ -3833,8 +3837,8 @@ void renameTo(final String srcArg, final String dstArg,
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot rename " + src);
src = resolvePath(src, srcComponents);
dst = resolvePath(dst, dstComponents);
src = dir.resolvePath(pc, src, srcComponents);
dst = dir.resolvePath(pc, dst, dstComponents);
renameToInternal(pc, src, dst, cacheEntry != null,
collectedBlocks, options);
resultingStat = getAuditFileInfo(dst, false);
@ -3952,7 +3956,7 @@ private boolean deleteInternal(String src, boolean recursive,
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot delete " + src);
src = resolvePath(src, pathComponents);
src = dir.resolvePath(pc, src, pathComponents);
if (!recursive && dir.isNonEmptyDirectory(src)) {
throw new PathIsNotEmptyDirectoryException(src + " is non empty");
}
@ -4121,7 +4125,7 @@ HdfsFileStatus getFileInfo(final String srcArg, boolean resolveLink)
readLock();
try {
checkOperation(OperationCategory.READ);
src = resolvePath(src, pathComponents);
src = dir.resolvePath(pc, src, pathComponents);
boolean isSuperUser = true;
if (isPermissionEnabled) {
checkPermission(pc, src, false, null, null, null, null, false,
@ -4152,7 +4156,7 @@ boolean isFileClosed(final String srcArg)
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
readLock();
try {
src = resolvePath(src, pathComponents);
src = dir.resolvePath(pc, src, pathComponents);
checkOperation(OperationCategory.READ);
if (isPermissionEnabled) {
checkTraverse(pc, src);
@ -4201,7 +4205,7 @@ private boolean mkdirsInt(final String srcArg, PermissionStatus permissions,
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot create directory " + src);
src = resolvePath(src, pathComponents);
src = dir.resolvePath(pc, src, pathComponents);
status = mkdirsInternal(pc, src, permissions, createParent);
if (status) {
resultingStat = getAuditFileInfo(src, false);
@ -4378,7 +4382,7 @@ ContentSummary getContentSummary(final String srcArg) throws IOException {
boolean success = true;
try {
checkOperation(OperationCategory.READ);
src = resolvePath(src, pathComponents);
src = dir.resolvePath(pc, src, pathComponents);
if (isPermissionEnabled) {
checkPermission(pc, src, false, null, null, null, FsAction.READ_EXECUTE);
}
@ -4435,12 +4439,13 @@ void fsync(String src, long fileId, String clientName, long lastBlockLength)
checkOperation(OperationCategory.WRITE);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
FSPermissionChecker pc = getPermissionChecker();
waitForLoadingFSImage();
writeLock();
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot fsync file " + src);
src = resolvePath(src, pathComponents);
src = dir.resolvePath(pc, src, pathComponents);
final INode inode;
if (fileId == INodeId.GRANDFATHER_INODE_ID) {
// Older clients may not have given us an inode ID to work with.
@ -4918,7 +4923,7 @@ private DirectoryListing getListingInt(final String srcArg, byte[] startAfter,
readLock();
try {
checkOperation(OperationCategory.READ);
src = resolvePath(src, pathComponents);
src = dir.resolvePath(pc, src, pathComponents);
// Get file name when startAfter is an INodePath
if (FSDirectory.isReservedName(startAfterString)) {
@ -6447,28 +6452,6 @@ private void checkTraverse(FSPermissionChecker pc, String path)
checkPermission(pc, path, false, null, null, null, null);
}
/**
* This is a wrapper for FSDirectory.resolvePath(). If the path passed
* is prefixed with /.reserved/raw, then it checks to ensure that the caller
* has super user privs.
*
* @param path The path to resolve.
* @param pathComponents path components corresponding to the path
* @return if the path indicates an inode, return path after replacing up to
* <inodeid> with the corresponding path of the inode, else the path
* in {@code src} as is. If the path refers to a path in the "raw"
* directory, return the non-raw pathname.
* @throws FileNotFoundException
* @throws AccessControlException
*/
private String resolvePath(String path, byte[][] pathComponents)
throws FileNotFoundException, AccessControlException {
if (FSDirectory.isReservedRawName(path)) {
checkSuperuserPrivilege();
}
return FSDirectory.resolvePath(path, pathComponents, dir);
}
@Override
public void checkSuperuserPrivilege()
throws AccessControlException {
@ -8576,7 +8559,7 @@ void modifyAclEntries(final String srcArg, List<AclEntry> aclSpec)
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot modify ACL entries on " + src);
src = resolvePath(src, pathComponents);
src = dir.resolvePath(pc, src, pathComponents);
checkOwner(pc, src);
List<AclEntry> newAcl = dir.modifyAclEntries(src, aclSpec);
getEditLog().logSetAcl(src, newAcl);
@ -8603,7 +8586,7 @@ void removeAclEntries(final String srcArg, List<AclEntry> aclSpec)
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot remove ACL entries on " + src);
src = resolvePath(src, pathComponents);
src = dir.resolvePath(pc, src, pathComponents);
checkOwner(pc, src);
List<AclEntry> newAcl = dir.removeAclEntries(src, aclSpec);
getEditLog().logSetAcl(src, newAcl);
@ -8629,7 +8612,7 @@ void removeDefaultAcl(final String srcArg) throws IOException {
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot remove default ACL entries on " + src);
src = resolvePath(src, pathComponents);
src = dir.resolvePath(pc, src, pathComponents);
checkOwner(pc, src);
List<AclEntry> newAcl = dir.removeDefaultAcl(src);
getEditLog().logSetAcl(src, newAcl);
@ -8655,7 +8638,7 @@ void removeAcl(final String srcArg) throws IOException {
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot remove ACL on " + src);
src = resolvePath(src, pathComponents);
src = dir.resolvePath(pc, src, pathComponents);
checkOwner(pc, src);
dir.removeAcl(src);
getEditLog().logSetAcl(src, AclFeature.EMPTY_ENTRY_LIST);
@ -8681,7 +8664,7 @@ void setAcl(final String srcArg, List<AclEntry> aclSpec) throws IOException {
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot set ACL on " + src);
src = resolvePath(src, pathComponents);
src = dir.resolvePath(pc, src, pathComponents);
checkOwner(pc, src);
List<AclEntry> newAcl = dir.setAcl(src, aclSpec);
getEditLog().logSetAcl(src, newAcl);
@ -8705,7 +8688,7 @@ AclStatus getAclStatus(String src) throws IOException {
readLock();
try {
checkOperation(OperationCategory.READ);
src = resolvePath(src, pathComponents);
src = dir.resolvePath(pc, src, pathComponents);
if (isPermissionEnabled) {
checkPermission(pc, src, false, null, null, null, null);
}
@ -8781,12 +8764,13 @@ private void createEncryptionZoneInt(final String srcArg, String cipher,
checkOperation(OperationCategory.WRITE);
final byte[][] pathComponents =
FSDirectory.getPathComponentsForReservedPath(src);
FSPermissionChecker pc = getPermissionChecker();
writeLock();
try {
checkSuperuserPrivilege();
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot create encryption zone on " + src);
src = resolvePath(src, pathComponents);
src = dir.resolvePath(pc, src, pathComponents);
final CipherSuite suite = CipherSuite.convert(cipher);
// For now this is hardcoded, as we only support one method.
@ -8828,7 +8812,7 @@ EncryptionZone getEZForPath(final String srcArg)
checkPathAccess(pc, src, FsAction.READ);
}
checkOperation(OperationCategory.READ);
src = resolvePath(src, pathComponents);
src = dir.resolvePath(pc, src, pathComponents);
final INodesInPath iip = dir.getINodesInPath(src, true);
final EncryptionZone ret = dir.getEZForPath(iip);
resultingStat = getAuditFileInfo(src, false);
@ -8907,7 +8891,7 @@ private void setXAttrInt(final String srcArg, XAttr xAttr,
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot set XAttr on " + src);
src = resolvePath(src, pathComponents);
src = dir.resolvePath(pc, src, pathComponents);
checkXAttrChangeAccess(src, xAttr, pc);
List<XAttr> xAttrs = Lists.newArrayListWithCapacity(1);
xAttrs.add(xAttr);
@ -8960,7 +8944,7 @@ List<XAttr> getXAttrs(final String srcArg, List<XAttr> xAttrs)
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
readLock();
try {
src = resolvePath(src, pathComponents);
src = dir.resolvePath(pc, src, pathComponents);
checkOperation(OperationCategory.READ);
if (isPermissionEnabled) {
checkPathAccess(pc, src, FsAction.READ);
@ -9008,7 +8992,7 @@ List<XAttr> listXAttrs(String src) throws IOException {
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
readLock();
try {
src = resolvePath(src, pathComponents);
src = dir.resolvePath(pc, src, pathComponents);
checkOperation(OperationCategory.READ);
if (isPermissionEnabled) {
/* To access xattr names, you need EXECUTE in the owning directory. */
@ -9069,7 +9053,7 @@ void removeXAttrInt(final String srcArg, XAttr xAttr, boolean logRetryCache)
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot remove XAttr entry on " + src);
src = resolvePath(src, pathComponents);
src = dir.resolvePath(pc, src, pathComponents);
checkXAttrChangeAccess(src, xAttr, pc);
List<XAttr> xAttrs = Lists.newArrayListWithCapacity(1);