HDFS-13668. FSPermissionChecker may throws AIOOE when check inode permission. Contributed by He Xiaoqiao.

This commit is contained in:
drankye 2018-08-13 17:32:56 +08:00
parent cadbc8b57f
commit 475bff6e8e
2 changed files with 41 additions and 4 deletions

View File

@ -409,7 +409,7 @@ private boolean hasPermission(INodeAttributes inode, FsAction access) {
}
final FsPermission mode = inode.getFsPermission();
final AclFeature aclFeature = inode.getAclFeature();
if (aclFeature != null) {
if (aclFeature != null && aclFeature.getEntriesSize() > 0) {
// It's possible that the inode has a default ACL but no access ACL.
int firstEntry = aclFeature.getEntryAt(0);
if (AclEntryStatusFormat.getScope(firstEntry) == AclEntryScope.ACCESS) {

View File

@ -57,6 +57,11 @@ public class TestINodeAttributeProvider {
public static class MyAuthorizationProvider extends INodeAttributeProvider {
public static class MyAccessControlEnforcer implements AccessControlEnforcer {
AccessControlEnforcer ace;
public MyAccessControlEnforcer(AccessControlEnforcer defaultEnforcer) {
this.ace = defaultEnforcer;
}
@Override
public void checkPermission(String fsOwner, String supergroup,
@ -65,6 +70,13 @@ public void checkPermission(String fsOwner, String supergroup,
int ancestorIndex, boolean doCheckOwner, FsAction ancestorAccess,
FsAction parentAccess, FsAction access, FsAction subAccess,
boolean ignoreEmptyDir) throws AccessControlException {
if (ancestorIndex > 1
&& inodes[1].getLocalName().equals("user")
&& inodes[2].getLocalName().equals("acl")) {
this.ace.checkPermission(fsOwner, supergroup, ugi, inodeAttrs, inodes,
pathByNameArr, snapshotId, path, ancestorIndex, doCheckOwner,
ancestorAccess, parentAccess, access, subAccess, ignoreEmptyDir);
}
CALLED.add("checkPermission|" + ancestorAccess + "|" + parentAccess + "|" + access);
}
}
@ -84,6 +96,7 @@ public INodeAttributes getAttributes(String[] pathElements,
final INodeAttributes inode) {
CALLED.add("getAttributes");
final boolean useDefault = useDefault(pathElements);
final boolean useNullAcl = useNullAclFeature(pathElements);
return new INodeAttributes() {
@Override
public boolean isDirectory() {
@ -126,7 +139,10 @@ public long getPermissionLong() {
@Override
public AclFeature getAclFeature() {
AclFeature f;
if (useDefault) {
if (useNullAcl) {
int[] entries = new int[0];
f = new AclFeature(entries);
} else if (useDefault) {
f = inode.getAclFeature();
} else {
AclEntry acl = new AclEntry.Builder().setType(AclEntryType.GROUP).
@ -167,8 +183,8 @@ public long getAccessTime() {
@Override
public AccessControlEnforcer getExternalAccessControlEnforcer(
AccessControlEnforcer deafultEnforcer) {
return new MyAccessControlEnforcer();
AccessControlEnforcer defaultEnforcer) {
return new MyAccessControlEnforcer(defaultEnforcer);
}
private boolean useDefault(String[] pathElements) {
@ -176,6 +192,11 @@ private boolean useDefault(String[] pathElements) {
!(pathElements[0].equals("user") && pathElements[1].equals("authz"));
}
private boolean useNullAclFeature(String[] pathElements) {
return (pathElements.length > 2)
&& pathElements[1].equals("user")
&& pathElements[2].equals("acl");
}
}
@Before
@ -368,4 +389,20 @@ public void testCustomProvider() throws Exception {
});
}
}
@Test
public void testAclFeature() throws Exception {
UserGroupInformation ugi = UserGroupInformation.createUserForTesting(
"testuser", new String[]{"testgroup"});
ugi.doAs((PrivilegedExceptionAction<Object>) () -> {
FileSystem fs = miniDFS.getFileSystem();
Path aclDir = new Path("/user/acl");
fs.mkdirs(aclDir);
Path aclChildDir = new Path(aclDir, "subdir");
fs.mkdirs(aclChildDir);
AclStatus aclStatus = fs.getAclStatus(aclDir);
Assert.assertEquals(0, aclStatus.getEntries().size());
return null;
});
}
}