HDFS-15689. allow/disallowSnapshot on EZ roots shouldn't fail due to trash provisioning/emptiness check (#2472)

This commit is contained in:
Siyao Meng 2020-11-25 11:01:04 -08:00 committed by GitHub
parent ac7045b75f
commit 235947e282
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 78 additions and 4 deletions

View File

@ -3158,6 +3158,17 @@ boolean isHDFSEncryptionEnabled() throws IOException {
return getKeyProviderUri() != null;
}
/**
* @return true if p is an encryption zone root
*/
boolean isEZRoot(Path p) throws IOException {
EncryptionZone ez = getEZForPath(p.toUri().getPath());
if (ez == null) {
return false;
}
return ez.getPath().equals(p.toString());
}
boolean isSnapshotTrashRootEnabled() throws IOException {
return getServerDefaults().getSnapshotTrashRootEnabled();
}

View File

@ -2122,6 +2122,12 @@ public Void next(final FileSystem fs, final Path p)
* @param p Path to a directory.
*/
private void checkTrashRootAndRemoveIfEmpty(final Path p) throws IOException {
// If p is EZ root, skip the check
if (dfs.isHDFSEncryptionEnabled() && dfs.isEZRoot(p)) {
DFSClient.LOG.debug("{} is an encryption zone root. "
+ "Skipping empty trash root check.", p);
return;
}
Path trashRoot = new Path(p, FileSystem.TRASH_PREFIX);
try {
// listStatus has 4 possible outcomes here:
@ -2139,9 +2145,10 @@ private void checkTrashRootAndRemoveIfEmpty(final Path p) throws IOException {
} else {
if (fileStatuses.length == 1
&& !fileStatuses[0].isDirectory()
&& !fileStatuses[0].getPath().equals(p)) {
&& fileStatuses[0].getPath().toUri().getPath().equals(
trashRoot.toString())) {
// Ignore the trash path because it is not a directory.
DFSClient.LOG.warn("{} is not a directory.", trashRoot);
DFSClient.LOG.warn("{} is not a directory. Ignored.", trashRoot);
} else {
throw new IOException("Found non-empty trash root at " +
trashRoot + ". Rename or delete it, then try again.");
@ -3002,19 +3009,24 @@ private Path provisionSnapshotTrash(
Path trashPath = new Path(path, FileSystem.TRASH_PREFIX);
try {
FileStatus trashFileStatus = getFileStatus(trashPath);
boolean throwException = false;
String errMessage = "Can't provision trash for snapshottable directory " +
pathStr + " because trash path " + trashPath.toString() +
" already exists.";
if (!trashFileStatus.isDirectory()) {
throwException = true;
errMessage += "\r\n" +
"WARNING: " + trashPath.toString() + " is not a directory.";
}
if (!trashFileStatus.getPermission().equals(trashPermission)) {
throwException = true;
errMessage += "\r\n" +
"WARNING: Permission of " + trashPath.toString() +
" differs from provided permission " + trashPermission;
}
if (throwException) {
throw new FileAlreadyExistsException(errMessage);
}
} catch (FileNotFoundException ignored) {
// Trash path doesn't exist. Continue
}

View File

@ -67,7 +67,6 @@
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class HdfsAdmin {
final private DistributedFileSystem dfs;
public static final FsPermission TRASH_PERMISSION = new FsPermission(
FsAction.ALL, FsAction.ALL, FsAction.ALL, true);

View File

@ -93,6 +93,7 @@
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
@ -925,6 +926,57 @@ private void verifyNodesAndCorruptBlocks(
.getECBlockGroupStats().getHighestPriorityLowRedundancyBlocks());
}
@Test
public void testAllowSnapshotWhenTrashExists() throws Exception {
final Path dirPath = new Path("/ssdir3");
final Path trashRoot = new Path(dirPath, ".Trash");
final DistributedFileSystem dfs = cluster.getFileSystem();
final DFSAdmin dfsAdmin = new DFSAdmin(conf);
// Case 1: trash directory exists and permission matches
dfs.mkdirs(trashRoot);
dfs.setPermission(trashRoot, TRASH_PERMISSION);
// allowSnapshot should still succeed even when trash exists
assertEquals(0, ToolRunner.run(dfsAdmin,
new String[]{"-allowSnapshot", dirPath.toString()}));
// Clean up. disallowSnapshot should remove the empty trash
assertEquals(0, ToolRunner.run(dfsAdmin,
new String[]{"-disallowSnapshot", dirPath.toString()}));
assertFalse(dfs.exists(trashRoot));
// Case 2: trash directory exists and but permission doesn't match
dfs.mkdirs(trashRoot);
dfs.setPermission(trashRoot, new FsPermission((short)0755));
// allowSnapshot should fail here
assertEquals(-1, ToolRunner.run(dfsAdmin,
new String[]{"-allowSnapshot", dirPath.toString()}));
// Correct trash permission and retry
dfs.setPermission(trashRoot, TRASH_PERMISSION);
assertEquals(0, ToolRunner.run(dfsAdmin,
new String[]{"-allowSnapshot", dirPath.toString()}));
// Clean up
assertEquals(0, ToolRunner.run(dfsAdmin,
new String[]{"-disallowSnapshot", dirPath.toString()}));
assertFalse(dfs.exists(trashRoot));
// Case 3: trash directory path is taken by a file
dfs.create(trashRoot).close();
// allowSnapshot should fail here
assertEquals(-1, ToolRunner.run(dfsAdmin,
new String[]{"-allowSnapshot", dirPath.toString()}));
// Remove the file and retry
dfs.delete(trashRoot, false);
assertEquals(0, ToolRunner.run(dfsAdmin,
new String[]{"-allowSnapshot", dirPath.toString()}));
// Clean up
assertEquals(0, ToolRunner.run(dfsAdmin,
new String[]{"-disallowSnapshot", dirPath.toString()}));
assertFalse(dfs.exists(trashRoot));
// Cleanup
dfs.delete(dirPath, true);
}
@Test
public void testAllowDisallowSnapshot() throws Exception {
final Path dirPath = new Path("/ssdir1");