HDFS-14921. Remove SuperUser Check in Setting Storage Policy in FileStatus During Listing. Contributed by Ayush Saxena.

This commit is contained in:
Vinayakumar B 2019-10-24 12:14:09 +05:30
parent fd84ca5161
commit ee699dc26c
2 changed files with 37 additions and 11 deletions

View File

@ -73,14 +73,12 @@ static DirectoryListing getListingInt(FSDirectory fsd, FSPermissionChecker pc,
}
}
boolean isSuperUser = true;
if (fsd.isPermissionEnabled()) {
if (iip.getLastINode() != null && iip.getLastINode().isDirectory()) {
fsd.checkPathAccess(pc, iip, FsAction.READ_EXECUTE);
}
isSuperUser = pc.isSuperUser();
}
return getListing(fsd, iip, startAfter, needLocation, isSuperUser);
return getListing(fsd, iip, startAfter, needLocation);
}
/**
@ -210,11 +208,10 @@ private static byte getStoragePolicyID(byte inodePolicy, byte parentPolicy) {
* path
* @param startAfter the name to start listing after
* @param needLocation if block locations are returned
* @param includeStoragePolicy if storage policy is returned
* @return a partial listing starting after startAfter
*/
private static DirectoryListing getListing(FSDirectory fsd, INodesInPath iip,
byte[] startAfter, boolean needLocation, boolean includeStoragePolicy)
byte[] startAfter, boolean needLocation)
throws IOException {
if (FSDirectory.isExactReservedName(iip.getPathComponents())) {
return getReservedListing(fsd);
@ -231,9 +228,7 @@ private static DirectoryListing getListing(FSDirectory fsd, INodesInPath iip,
return null;
}
byte parentStoragePolicy = includeStoragePolicy
? targetNode.getStoragePolicyID()
: HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED;
byte parentStoragePolicy = targetNode.getStoragePolicyID();
if (!targetNode.isDirectory()) {
// return the file's status. note that the iip already includes the
@ -255,7 +250,8 @@ private static DirectoryListing getListing(FSDirectory fsd, INodesInPath iip,
HdfsFileStatus listing[] = new HdfsFileStatus[numOfListing];
for (int i = 0; i < numOfListing && locationBudget > 0; i++) {
INode child = contents.get(startChild+i);
byte childStoragePolicy = (includeStoragePolicy && !child.isSymlink())
byte childStoragePolicy =
!child.isSymlink()
? getStoragePolicyID(child.getLocalStoragePolicyID(),
parentStoragePolicy)
: parentStoragePolicy;

View File

@ -79,6 +79,7 @@
import org.apache.hadoop.fs.StorageStatistics.LongStatistic;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DistributedFileSystem.HdfsDataOutputStreamBuilder;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
@ -90,6 +91,7 @@
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
@ -1879,6 +1881,34 @@ public Void run() throws Exception {
}
}
@Test
public void testListingStoragePolicyNonSuperUser() throws Exception {
HdfsConfiguration conf = new HdfsConfiguration();
try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build()) {
cluster.waitActive();
final DistributedFileSystem dfs = cluster.getFileSystem();
Path dir = new Path("/dir");
dfs.mkdirs(dir);
dfs.setPermission(dir,
new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
// Create a non-super user.
UserGroupInformation user = UserGroupInformation.createUserForTesting(
"Non_SuperUser", new String[] {"Non_SuperGroup"});
DistributedFileSystem userfs = (DistributedFileSystem) user.doAs(
(PrivilegedExceptionAction<FileSystem>) () -> FileSystem.get(conf));
Path sDir = new Path("/dir/sPolicy");
userfs.mkdirs(sDir);
userfs.setStoragePolicy(sDir, "COLD");
HdfsFileStatus[] list = userfs.getClient()
.listPaths(dir.toString(), HdfsFileStatus.EMPTY_NAME)
.getPartialListing();
assertEquals(HdfsConstants.COLD_STORAGE_POLICY_ID,
list[0].getStoragePolicy());
}
}
@Test
public void testRemoveErasureCodingPolicy() throws Exception {
Configuration conf = getTestConfiguration();