HDFS-13377. The owner of folder can set quota for his sub folder. Contributed by Yang Yun.

This commit is contained in:
Ayush Saxena 2020-03-24 22:56:09 +05:30
parent 4454c6d14b
commit ea87d60493
6 changed files with 285 additions and 7 deletions

View File

@ -327,6 +327,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_PERMISSIONS_SUPERUSERGROUP_KEY = public static final String DFS_PERMISSIONS_SUPERUSERGROUP_KEY =
HdfsClientConfigKeys.DeprecatedKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY; HdfsClientConfigKeys.DeprecatedKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY;
public static final String DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT = "supergroup"; public static final String DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT = "supergroup";
public static final String DFS_PERMISSIONS_ALLOW_OWNER_SET_QUOTA_KEY
= "dfs.permissions.allow.owner.set.quota";
public static final boolean DFS_PERMISSIONS_ALLOW_OWNER_SET_QUOTA_DEFAULT
= false;
public static final String DFS_NAMENODE_ACLS_ENABLED_KEY = "dfs.namenode.acls.enabled"; public static final String DFS_NAMENODE_ACLS_ENABLED_KEY = "dfs.namenode.acls.enabled";
public static final boolean DFS_NAMENODE_ACLS_ENABLED_DEFAULT = true; public static final boolean DFS_NAMENODE_ACLS_ENABLED_DEFAULT = true;
public static final String DFS_NAMENODE_POSIX_ACL_INHERITANCE_ENABLED_KEY = public static final String DFS_NAMENODE_POSIX_ACL_INHERITANCE_ENABLED_KEY =

View File

@ -232,11 +232,21 @@ static long getPreferredBlockSize(FSDirectory fsd, FSPermissionChecker pc,
* Note: This does not support ".inodes" relative path. * Note: This does not support ".inodes" relative path.
*/ */
static void setQuota(FSDirectory fsd, FSPermissionChecker pc, String src, static void setQuota(FSDirectory fsd, FSPermissionChecker pc, String src,
long nsQuota, long ssQuota, StorageType type) throws IOException { long nsQuota, long ssQuota, StorageType type, boolean allowOwner)
throws IOException {
fsd.writeLock(); fsd.writeLock();
try { try {
INodesInPath iip = fsd.resolvePath(pc, src, DirOp.WRITE); INodesInPath iip = fsd.resolvePath(pc, src, DirOp.WRITE);
if (fsd.isPermissionEnabled() && !pc.isSuperUser() && allowOwner) {
INodeDirectory parentDir= iip.getLastINode().getParent();
if (parentDir == null ||
!parentDir.getUserName().equals(pc.getUser())) {
throw new AccessControlException(
"Access denied for user " + pc.getUser() +
". Superuser or owner of parent folder privilege is required");
}
}
INodeDirectory changed = INodeDirectory changed =
unprotectedSetQuota(fsd, iip, nsQuota, ssQuota, type); unprotectedSetQuota(fsd, iip, nsQuota, ssQuota, type);
if (changed != null) { if (changed != null) {

View File

@ -463,6 +463,8 @@ private void logAuditEvent(boolean succeeded,
// Batch size for open files response // Batch size for open files response
private final int maxListOpenFilesResponses; private final int maxListOpenFilesResponses;
private final boolean allowOwnerSetQuota;
// Scan interval is not configurable. // Scan interval is not configurable.
private static final long DELEGATION_TOKEN_REMOVER_SCAN_INTERVAL = private static final long DELEGATION_TOKEN_REMOVER_SCAN_INTERVAL =
TimeUnit.MILLISECONDS.convert(1, TimeUnit.HOURS); TimeUnit.MILLISECONDS.convert(1, TimeUnit.HOURS);
@ -977,7 +979,9 @@ static FSNamesystem loadFromDisk(Configuration conf) throws IOException {
DFSConfigKeys.DFS_NAMENODE_LIST_OPENFILES_NUM_RESPONSES + DFSConfigKeys.DFS_NAMENODE_LIST_OPENFILES_NUM_RESPONSES +
" must be a positive integer." " must be a positive integer."
); );
this.allowOwnerSetQuota = conf.getBoolean(
DFSConfigKeys.DFS_PERMISSIONS_ALLOW_OWNER_SET_QUOTA_KEY,
DFSConfigKeys.DFS_PERMISSIONS_ALLOW_OWNER_SET_QUOTA_DEFAULT);
this.blockDeletionIncrement = conf.getInt( this.blockDeletionIncrement = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_BLOCK_DELETION_INCREMENT_KEY, DFSConfigKeys.DFS_NAMENODE_BLOCK_DELETION_INCREMENT_KEY,
DFSConfigKeys.DFS_NAMENODE_BLOCK_DELETION_INCREMENT_DEFAULT); DFSConfigKeys.DFS_NAMENODE_BLOCK_DELETION_INCREMENT_DEFAULT);
@ -3468,12 +3472,15 @@ void setQuota(String src, long nsQuota, long ssQuota, StorageType type)
final FSPermissionChecker pc = getPermissionChecker(); final FSPermissionChecker pc = getPermissionChecker();
FSPermissionChecker.setOperationType(operationName); FSPermissionChecker.setOperationType(operationName);
try { try {
checkSuperuserPrivilege(pc); if(!allowOwnerSetQuota) {
checkSuperuserPrivilege(pc);
}
writeLock(); writeLock();
try { try {
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot set quota on " + src); checkNameNodeSafeMode("Cannot set quota on " + src);
FSDirAttrOp.setQuota(dir, pc, src, nsQuota, ssQuota, type); FSDirAttrOp.setQuota(dir, pc, src, nsQuota, ssQuota, type,
allowOwnerSetQuota);
} finally { } finally {
writeUnlock(operationName); writeUnlock(operationName);
} }

View File

@ -5834,4 +5834,13 @@
must be shorter than the window size. must be shorter than the window size.
</description> </description>
</property> </property>
<property>
<name>dfs.permissions.allow.owner.set.quota</name>
<value>false</value>
<description>
Whether the owner(not superuser) of a directory can set quota of his sub
directories when permissions is enabled. Default value is false;
</description>
</property>
</configuration> </configuration>

View File

@ -150,12 +150,12 @@ public static void tearDownClass() {
resetStream(); resetStream();
} }
private void runCommand(DFSAdmin admin, boolean expectError, String... args) static void runCommand(DFSAdmin admin, boolean expectError, String... args)
throws Exception { throws Exception {
runCommand(admin, args, expectError); runCommand(admin, args, expectError);
} }
private void runCommand(DFSAdmin admin, String args[], boolean expectEror) static void runCommand(DFSAdmin admin, String[] args, boolean expectEror)
throws Exception { throws Exception {
int val = admin.run(args); int val = admin.run(args);
if (expectEror) { if (expectEror) {

View File

@ -0,0 +1,248 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.tools.DFSAdmin;
import org.apache.hadoop.security.UserGroupInformation;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class TestQuotaAllowOwner {
private static Configuration conf;
private static MiniDFSCluster cluster;
private static DistributedFileSystem dfs;
@BeforeClass
public static void setUpClass() throws Exception {
conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 512);
conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ALLOW_OWNER_SET_QUOTA_KEY,
true);
restartCluster();
}
@AfterClass
public static void tearDownClass() {
if (cluster != null) {
cluster.shutdown();
cluster = null;
}
}
private static void restartCluster() throws IOException {
if (cluster != null) {
cluster.shutdown();
}
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
cluster.waitActive();
dfs = cluster.getFileSystem();
}
private void createDirssAndSetOwner(
String parentDir, String subDir, String owner, String group)
throws Exception {
Path parentPath = new Path(parentDir);
assertTrue(dfs.mkdirs(parentPath));
dfs.setOwner(parentPath, owner, group);
assertTrue(dfs.mkdirs(new Path(subDir)));
}
/**
* Test the owner(not admin) of directory can set the quota of
* it's sub-directories.
*/
@Test
public void testOwnerCanSetSubDirQuota() throws Exception {
final String userName = "user1";
final String groupName = "hadoop";
final String parentDir = "/parent_owner";
final String subDir = parentDir + "/subdir";
createDirssAndSetOwner(parentDir, subDir, userName, groupName);
final DFSAdmin admin = new DFSAdmin(conf);
// set quota with superuser.
String[] args = new String[]{"-setQuota", "10", parentDir.toString()};
TestQuota.runCommand(admin, args, false);
args = new String[]{"-setSpaceQuota", "128", parentDir.toString()};
TestQuota.runCommand(admin, args, false);
UserGroupInformation ugi = UserGroupInformation.createUserForTesting(
userName, new String[]{groupName});
ugi.doAs((PrivilegedExceptionAction<Object>) () -> {
assertEquals("Not running as new user", userName,
UserGroupInformation.getCurrentUser().getShortUserName());
DFSAdmin userAdmin = new DFSAdmin(conf);
String[] args2 = new String[]{"-setQuota", "5", subDir};
TestQuota.runCommand(userAdmin, args2, false);
args2 = new String[]{"-setSpaceQuota", "64", subDir};
TestQuota.runCommand(userAdmin, args2, false);
ContentSummary c = dfs.getContentSummary(new Path(subDir));
assertEquals("Not same with setting quota",
5, c.getQuota());
assertEquals("Not same with setting space quota",
64, c.getSpaceQuota());
args2 = new String[]{"-clrQuota", subDir};
TestQuota.runCommand(userAdmin, args2, false);
args2 = new String[]{"-clrSpaceQuota", subDir};
TestQuota.runCommand(userAdmin, args2, false);
c = dfs.getContentSummary(new Path(subDir));
assertEquals("Not clean quota", -1, c.getQuota());
assertEquals("Not clean space quota",
-1, c.getSpaceQuota());
return null;
});
}
/**
* Test the owner(not admin) of directory can set the quota of
* it's sub-directories even the admin never set quota to it.
*/
@Test
public void testOwnerCanSetSubDirQuotaWithoutAdminDone() throws Exception {
final String userName = "user1";
final String groupName = "hadoop";
final String parentDir = "/parent_owner_without_Admin";
final String subDir = parentDir + "/subdir";
createDirssAndSetOwner(parentDir, subDir, userName, groupName);
UserGroupInformation ugi2 = UserGroupInformation.createUserForTesting(
userName, new String[]{groupName});
ugi2.doAs((PrivilegedExceptionAction<Object>) () -> {
assertEquals("Not running as new user", userName,
UserGroupInformation.getCurrentUser().getShortUserName());
DFSAdmin userAdmin = new DFSAdmin(conf);
String[] args2 = new String[]{"-setQuota", "5", subDir};
TestQuota.runCommand(userAdmin, args2, false);
args2 = new String[]{"-setSpaceQuota", "64", subDir};
TestQuota.runCommand(userAdmin, args2, false);
return null;
});
}
/**
* Test other user(not admin) can NOT set the quota of sub-directories.
*/
@Test
public void testOtherCanNotSetSubDirQuota() throws Exception {
final String userName = "user1";
final String groupName = "hadoop";
final String userOther = "otherUser";
final String groupOther = "otherGroup";
final String parentDir = "/parent_other_user";
final String subDir = parentDir + "/subdir";
createDirssAndSetOwner(parentDir, subDir, userName, groupName);
UserGroupInformation ugi = UserGroupInformation.createUserForTesting(
userOther, new String[]{groupOther});
ugi.doAs((PrivilegedExceptionAction<Object>) () -> {
assertEquals("Not running as new user", userOther,
UserGroupInformation.getCurrentUser().getShortUserName());
DFSAdmin userAdmin = new DFSAdmin(conf);
String[] args2 = new String[]{"-setQuota", "5", subDir.toString()};
TestQuota.runCommand(userAdmin, args2, true);
args2 = new String[]{"-setSpaceQuota", "64", subDir.toString()};
TestQuota.runCommand(userAdmin, args2, true);
return null;
});
}
/**
* Test other user(not admin) with same group can NOT set the quota of
* sub-directories.
*/
@Test
public void testOtherInSameGroupCanNotSetSubDirQuota() throws Exception {
final String userName = "user1";
final String groupName = "hadoop";
final String userOther = "otherUser";
final String parentDir = "/parent_other_user_in_same_group";
final String subDir = parentDir + "/subdir";
createDirssAndSetOwner(parentDir, subDir, userName, groupName);
UserGroupInformation ugi = UserGroupInformation.createUserForTesting(
userOther, new String[]{groupName});
ugi.doAs((PrivilegedExceptionAction<Object>) () -> {
assertEquals("Not running as new user", userOther,
UserGroupInformation.getCurrentUser().getShortUserName());
DFSAdmin userAdmin = new DFSAdmin(conf);
String[] args2 = new String[]{"-setQuota", "5", subDir.toString()};
TestQuota.runCommand(userAdmin, args2, true);
args2 = new String[]{"-setSpaceQuota", "64", subDir.toString()};
TestQuota.runCommand(userAdmin, args2, true);
return null;
});
}
/**
* Test the owner(not admin) of directory can NOT set the quota of
* sub-directories if this feature is not enabled.
*/
@Test
public void testOwnerCanNotSetIfNotEanbled() throws Exception {
final String userName = "user1";
final String groupName = "hadoop";
final String parentDir = "/parent_owner_without_Admin";
final String subDir = parentDir + "/subdir";
try {
conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ALLOW_OWNER_SET_QUOTA_KEY,
false);
restartCluster();
createDirssAndSetOwner(parentDir, subDir, userName, groupName);
UserGroupInformation ugi = UserGroupInformation.createUserForTesting(
userName, new String[]{groupName});
ugi.doAs((PrivilegedExceptionAction<Object>) () -> {
assertEquals("Not running as new user", userName,
UserGroupInformation.getCurrentUser().getShortUserName());
DFSAdmin userAdmin = new DFSAdmin(conf);
String[] args2 = new String[]{"-setQuota", "5", subDir};
TestQuota.runCommand(userAdmin, args2, true);
args2 = new String[]{"-setSpaceQuota", "64", subDir};
TestQuota.runCommand(userAdmin, args2, true);
return null;
});
} finally {
conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ALLOW_OWNER_SET_QUOTA_KEY,
true);
restartCluster();
}
}
}