HDFS-5802. NameNode does not check for inode type before traversing down a path. (Xiao Chen via Yongjun Zhang)

This commit is contained in:
Yongjun Zhang 2015-09-17 22:56:14 -07:00
parent 92c1af1646
commit 2ff6faf954
3 changed files with 67 additions and 3 deletions

View File

@ -932,6 +932,9 @@ Release 2.8.0 - UNRELEASED
HDFS-9022. Move NameNode.getAddress() and NameNode.getUri() to HDFS-9022. Move NameNode.getAddress() and NameNode.getUri() to
hadoop-hdfs-client. (Mingliang Liu via wheat9) hadoop-hdfs-client. (Mingliang Liu via wheat9)
HDFS-5802. NameNode does not check for inode type before traversing down a
path. (Xiao Chen via Yongjun Zhang)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -192,6 +192,25 @@ void checkPermission(INodesInPath inodesInPath, boolean doCheckOwner,
ancestorAccess, parentAccess, access, subAccess, ignoreEmptyDir); ancestorAccess, parentAccess, access, subAccess, ignoreEmptyDir);
} }
/**
* Check whether exception e is due to an ancestor inode's not being
* directory.
*/
private void checkAncestorType(INode[] inodes, int ancestorIndex,
AccessControlException e) throws AccessControlException {
for (int i = 0; i <= ancestorIndex; i++) {
if (inodes[i] == null) {
break;
}
if (!inodes[i].isDirectory()) {
throw new AccessControlException(
e.getMessage() + " (Ancestor " + inodes[i].getFullPathName()
+ " is not a directory).");
}
}
throw e;
}
@Override @Override
public void checkPermission(String fsOwner, String supergroup, public void checkPermission(String fsOwner, String supergroup,
UserGroupInformation callerUgi, INodeAttributes[] inodeAttrs, UserGroupInformation callerUgi, INodeAttributes[] inodeAttrs,
@ -202,7 +221,11 @@ public void checkPermission(String fsOwner, String supergroup,
throws AccessControlException { throws AccessControlException {
for(; ancestorIndex >= 0 && inodes[ancestorIndex] == null; for(; ancestorIndex >= 0 && inodes[ancestorIndex] == null;
ancestorIndex--); ancestorIndex--);
try {
checkTraverse(inodeAttrs, path, ancestorIndex); checkTraverse(inodeAttrs, path, ancestorIndex);
} catch (AccessControlException e) {
checkAncestorType(inodes, ancestorIndex, e);
}
final INodeAttributes last = inodeAttrs[inodeAttrs.length - 1]; final INodeAttributes last = inodeAttrs[inodeAttrs.length - 1];
if (parentAccess != null && parentAccess.implies(FsAction.WRITE) if (parentAccess != null && parentAccess.implies(FsAction.WRITE)

View File

@ -22,6 +22,7 @@
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import java.io.DataOutputStream;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
@ -510,6 +511,43 @@ public FileSystem run() throws Exception {
} }
} }
@Test
public void testPermissionMessageOnNonDirAncestor()
throws IOException, InterruptedException {
FileSystem rootFs = FileSystem.get(conf);
Path p4 = new Path("/p4");
rootFs.mkdirs(p4);
rootFs.setOwner(p4, USER1_NAME, GROUP1_NAME);
final Path fpath = new Path("/p4/file");
DataOutputStream out = rootFs.create(fpath);
out.writeBytes("dhruba: " + fpath);
out.close();
rootFs.setOwner(fpath, USER1_NAME, GROUP1_NAME);
assertTrue(rootFs.exists(fpath));
fs = USER1.doAs(new PrivilegedExceptionAction<FileSystem>() {
@Override
public FileSystem run() throws Exception {
return FileSystem.get(conf);
}
});
final Path nfpath = new Path("/p4/file/nonexisting");
assertFalse(rootFs.exists(nfpath));
try {
fs.exists(nfpath);
fail("The exists call should have failed.");
} catch (AccessControlException e) {
assertTrue("Permission denied messages must carry file path",
e.getMessage().contains(fpath.getName()));
assertTrue("Permission denied messages must specify existing_file is not "
+ "a directory, when checked on /existing_file/non_existing_name",
e.getMessage().contains("is not a directory"));
}
}
/* Check if namenode performs permission checking correctly /* Check if namenode performs permission checking correctly
* for the given user for operations mkdir, open, setReplication, * for the given user for operations mkdir, open, setReplication,
* getFileInfo, isDirectory, exists, getContentLength, list, rename, * getFileInfo, isDirectory, exists, getContentLength, list, rename,