HDFS-16983. Fix concat operation doesn't honor dfs.permissions.enabled (#5561). Contributed by caozhiqiang.

Reviewed-by: zhangshuyan <zqingchai@gmail.com>
Reviewed-by: He Xiaoqiao <hexiaoqiao@apache.org>
Signed-off-by: Ayush Saxena <ayushsaxena@apache.org>
This commit is contained in:
caozhiqiang 2023-06-05 19:12:59 +08:00 committed by GitHub
parent 241398de3b
commit 5d6ca13c5c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 96 additions and 1 deletions

View File

@ -121,7 +121,7 @@ private static INodeFile[] verifySrcFiles(FSDirectory fsd, String[] srcs,
for(String src : srcs) {
final INodesInPath iip = fsd.resolvePath(pc, src, DirOp.WRITE);
// permission check for srcs
if (pc != null) {
if (pc != null && fsd.isPermissionEnabled()) {
fsd.checkPathAccess(pc, iip, FsAction.READ); // read the file
fsd.checkParentAccess(pc, iip, FsAction.WRITE); // for delete
}

View File

@ -34,6 +34,7 @@
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
@ -43,6 +44,7 @@
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.LambdaTestUtils;
@ -564,4 +566,97 @@ public void testConcatOnSameFile() throws Exception {
assertEquals(1, dfs.getContentSummary(new Path(dir)).getFileCount());
}
/**
* Verifies concat with wrong user when dfs.permissions.enabled is false.
*
* @throws IOException
*/
@Test
public void testConcatPermissionEnabled() throws Exception {
Configuration conf2 = new Configuration();
conf2.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
conf2.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true);
MiniDFSCluster cluster2 = new MiniDFSCluster.Builder(conf2).numDataNodes(REPL_FACTOR).build();
try {
cluster2.waitClusterUp();
DistributedFileSystem dfs2 = cluster2.getFileSystem();
String testPathDir = "/dir2";
Path dir = new Path(testPathDir);
dfs2.mkdirs(dir);
Path trg = new Path(testPathDir, "trg");
Path src = new Path(testPathDir, "src");
DFSTestUtil.createFile(dfs2, trg, blockSize, REPL_FACTOR, 1);
DFSTestUtil.createFile(dfs2, src, blockSize, REPL_FACTOR, 1);
// Check permissions with the wrong user when dfs.permissions.enabled is true.
final UserGroupInformation user =
UserGroupInformation.createUserForTesting("theDoctor", new String[] {"tardis"});
DistributedFileSystem hdfs1 =
(DistributedFileSystem) DFSTestUtil.getFileSystemAs(user, conf2);
LambdaTestUtils.intercept(AccessControlException.class,
"Permission denied: user=theDoctor, access=WRITE",
() -> hdfs1.concat(trg, new Path[] {src}));
conf2.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, false);
cluster2 = new MiniDFSCluster.Builder(conf2).numDataNodes(REPL_FACTOR).build();
cluster2.waitClusterUp();
dfs2 = cluster2.getFileSystem();
dfs2.mkdirs(dir);
DFSTestUtil.createFile(dfs2, trg, blockSize, REPL_FACTOR, 1);
DFSTestUtil.createFile(dfs2, src, blockSize, REPL_FACTOR, 1);
// Check permissions with the wrong user when dfs.permissions.enabled is false.
DistributedFileSystem hdfs2 =
(DistributedFileSystem) DFSTestUtil.getFileSystemAs(user, conf2);
hdfs2.concat(trg, new Path[] {src});
} finally {
if (cluster2 != null) {
cluster2.shutdown();
}
}
}
/**
* Test permissions of Concat operation.
*/
@Test
public void testConcatPermissions() throws Exception {
String testPathDir = "/dir";
Path dir = new Path(testPathDir);
dfs.mkdirs(dir);
dfs.setPermission(dir, new FsPermission((short) 0777));
Path dst = new Path(testPathDir, "dst");
Path src = new Path(testPathDir, "src");
DFSTestUtil.createFile(dfs, dst, blockSize, REPL_FACTOR, 1);
// Create a user who is not the owner of the file and try concat operation.
final UserGroupInformation user =
UserGroupInformation.createUserForTesting("theDoctor", new String[] {"group"});
DistributedFileSystem dfs2 = (DistributedFileSystem) DFSTestUtil.getFileSystemAs(user, conf);
// Test 1: User is not the owner of the file and has src & dst permission.
DFSTestUtil.createFile(dfs, src, blockSize, REPL_FACTOR, 1);
dfs.setPermission(dst, new FsPermission((short) 0777));
dfs.setPermission(src, new FsPermission((short) 0777));
dfs2.concat(dst, new Path[] {src});
// Test 2: User is not the owner of the file and has only dst permission.
DFSTestUtil.createFile(dfs, src, blockSize, REPL_FACTOR, 1);
dfs.setPermission(dst, new FsPermission((short) 0777));
dfs.setPermission(src, new FsPermission((short) 0700));
LambdaTestUtils.intercept(AccessControlException.class,
"Permission denied: user=theDoctor, access=READ",
() -> dfs2.concat(dst, new Path[] {src}));
// Test 3: User is not the owner of the file and has only src permission.
DFSTestUtil.createFile(dfs, src, blockSize, REPL_FACTOR, 1);
dfs.setPermission(dst, new FsPermission((short) 0700));
dfs.setPermission(src, new FsPermission((short) 0777));
LambdaTestUtils.intercept(AccessControlException.class,
"Permission denied: user=theDoctor, access=WRITE",
() -> dfs2.concat(dst, new Path[] {src}));
}
}