HADOOP-14820 Wasb mkdirs security checks inconsistent with HDFS.

Contributed by Sivaguru Sankaridurg
This commit is contained in:
Steve Loughran 2017-09-05 14:16:57 +01:00
parent 5dba54596a
commit 792eff9ea7
No known key found for this signature in database
GPG Key ID: 950CC3E032B79CA2
3 changed files with 77 additions and 14 deletions

View File

@ -2425,13 +2425,13 @@ PermissionStatus createPermissionStatus(FsPermission permission)
private Path getAncestor(Path f) throws IOException { private Path getAncestor(Path f) throws IOException {
for (Path current = f.getParent(), parent = current.getParent(); for (Path current = f, parent = current.getParent();
parent != null; // Stop when you get to the root parent != null; // Stop when you get to the root
current = parent, parent = current.getParent()) { current = parent, parent = current.getParent()) {
String currentKey = pathToKey(current); String currentKey = pathToKey(current);
FileMetadata currentMetadata = store.retrieveMetadata(currentKey); FileMetadata currentMetadata = store.retrieveMetadata(currentKey);
if (currentMetadata != null) { if (currentMetadata != null && currentMetadata.isDir()) {
Path ancestor = keyToPath(currentMetadata.getKey()); Path ancestor = keyToPath(currentMetadata.getKey());
LOG.debug("Found ancestor {}, for path: {}", ancestor.toString(), f.toString()); LOG.debug("Found ancestor {}, for path: {}", ancestor.toString(), f.toString());
return ancestor; return ancestor;
@ -2448,7 +2448,6 @@ public boolean mkdirs(Path f, FsPermission permission) throws IOException {
public boolean mkdirs(Path f, FsPermission permission, boolean noUmask) throws IOException { public boolean mkdirs(Path f, FsPermission permission, boolean noUmask) throws IOException {
LOG.debug("Creating directory: {}", f.toString()); LOG.debug("Creating directory: {}", f.toString());
if (containsColon(f)) { if (containsColon(f)) {
@ -2459,6 +2458,10 @@ public boolean mkdirs(Path f, FsPermission permission, boolean noUmask) throws I
Path absolutePath = makeAbsolute(f); Path absolutePath = makeAbsolute(f);
Path ancestor = getAncestor(absolutePath); Path ancestor = getAncestor(absolutePath);
if (absolutePath.equals(ancestor)) {
return true;
}
performAuthCheck(ancestor, WasbAuthorizationOperations.WRITE, "mkdirs", absolutePath); performAuthCheck(ancestor, WasbAuthorizationOperations.WRITE, "mkdirs", absolutePath);
PermissionStatus permissionStatus = null; PermissionStatus permissionStatus = null;

View File

@ -603,6 +603,70 @@ public void testMkdirsCheckPositive() throws Throwable {
} }
} }
/**
* Positive test for mkdirs -p with existing hierarchy
* @throws Throwable
*/
@Test
public void testMkdirsWithExistingHierarchyCheckPositive1() throws Throwable {
Path testPath = new Path("/testMkdirsWithExistingHierarchyCheckPositive1");
authorizer.addAuthRule("/", WasbAuthorizationOperations.WRITE.toString(), true);
fs.updateWasbAuthorizer(authorizer);
try {
fs.mkdirs(testPath);
ContractTestUtils.assertIsDirectory(fs, testPath);
/* Don't need permissions to create a directory that already exists */
authorizer.deleteAllAuthRules();
fs.mkdirs(testPath);
ContractTestUtils.assertIsDirectory(fs, testPath);
}
finally {
allowRecursiveDelete(fs, testPath.toString());
fs.delete(testPath, true);
}
}
@Test
public void testMkdirsWithExistingHierarchyCheckPositive2() throws Throwable {
Path testPath = new Path("/testMkdirsWithExistingHierarchyCheckPositive2");
Path childPath1 = new Path(testPath, "1");
Path childPath2 = new Path(childPath1, "2");
Path childPath3 = new Path(childPath2, "3");
authorizer.addAuthRule("/",
WasbAuthorizationOperations.WRITE.toString(), true);
authorizer.addAuthRule(childPath1.toString(),
WasbAuthorizationOperations.WRITE.toString(), true);
fs.updateWasbAuthorizer(authorizer);
try {
fs.mkdirs(childPath1);
ContractTestUtils.assertIsDirectory(fs, childPath1);
// Path already exists => no-op.
fs.mkdirs(testPath);
ContractTestUtils.assertIsDirectory(fs, testPath);
// Path already exists => no-op.
fs.mkdirs(childPath1);
ContractTestUtils.assertIsDirectory(fs, childPath1);
// Check permissions against existing ancestor childPath1
fs.mkdirs(childPath3);
ContractTestUtils.assertIsDirectory(fs, childPath3);
} finally {
allowRecursiveDelete(fs, testPath.toString());
fs.delete(testPath, true);
}
}
/** /**
* Negative test for mkdirs access check * Negative test for mkdirs access check
* @throws Throwable * @throws Throwable

View File

@ -108,17 +108,13 @@ public void testMetricsOnMkdirList() throws Exception {
// Create a directory // Create a directory
assertTrue(fs.mkdirs(new Path("a"))); assertTrue(fs.mkdirs(new Path("a")));
// At the time of writing, it takes 1 request to create the actual directory, // At the time of writing
// plus 2 requests per level to check that there's no blob with that name and // getAncestor uses 2 calls for each folder level /user/<name>/a
// 1 request per level above to create it if it doesn't exist. // plus 1 call made by checkContainer
// So for the path above (/user/<name>/a), it takes 2 requests each to check // mkdir checks the hierarchy with 2 calls per level
// there's no blob called /user, no blob called /user/<name> and no blob // mkdirs calls storeEmptyDir to create the empty folder, which makes 5 calls
// called /user/<name>/a, and then 3 request for the creation of the three // For a total of 7 + 6 + 5 = 18 web responses
// levels, and then 2 requests for checking/stamping the version of AS, base = assertWebResponsesInRange(base, 1, 18);
// totaling 11.
// Also, there's the initial 1 request for container check so total is 12.
// The getAncestor call at the very beginning adds another 4 calls, totalling 16.
base = assertWebResponsesInRange(base, 1, 16);
assertEquals(1, assertEquals(1,
AzureMetricsTestUtil.getLongCounterValue(getInstrumentation(), WASB_DIRECTORIES_CREATED)); AzureMetricsTestUtil.getLongCounterValue(getInstrumentation(), WASB_DIRECTORIES_CREATED));