HDFS-4729. Fix OfflineImageViewer and permission checking for snapshot operations. Contributed by Jing Zhao

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2802@1471665 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2013-04-24 20:47:13 +00:00
parent 0fa5cad0b2
commit e8b64d3851
6 changed files with 69 additions and 18 deletions

View File

@ -296,3 +296,6 @@ Branch-2802 Snapshot (Unreleased)
HDFS-4686. Update quota computation for rename and INodeReference.
(Jing Zhao via szetszwo)
HDFS-4729. Fix OfflineImageViewer and permission checking for snapshot
operations. (Jing Zhao via szetszwo)

View File

@ -5813,7 +5813,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
/** Allow snapshot on a directroy. */
void allowSnapshot(String path) throws SafeModeException, IOException {
final FSPermissionChecker pc = getPermissionChecker();
writeLock();
try {
checkOperation(OperationCategory.WRITE);
@ -5821,7 +5820,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
throw new SafeModeException("Cannot allow snapshot for " + path,
safeMode);
}
checkOwner(pc, path);
checkSuperuserPrivilege();
snapshotManager.setSnapshottable(path);
getEditLog().logAllowSnapshot(path);
@ -5837,7 +5836,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
/** Disallow snapshot on a directory. */
void disallowSnapshot(String path) throws SafeModeException, IOException {
final FSPermissionChecker pc = getPermissionChecker();
writeLock();
try {
checkOperation(OperationCategory.WRITE);
@ -5845,7 +5843,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
throw new SafeModeException("Cannot disallow snapshot for " + path,
safeMode);
}
checkOwner(pc, path);
checkSuperuserPrivilege();
snapshotManager.resetSnapshottable(path);
getEditLog().logDisallowSnapshot(path);
@ -5875,7 +5873,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
throw new SafeModeException("Cannot create snapshot for "
+ snapshotRoot, safeMode);
}
if (isPermissionEnabled) {
checkOwner(pc, snapshotRoot);
}
if (snapshotName == null || snapshotName.isEmpty()) {
snapshotName = Snapshot.generateDefaultSnapshotName();
@ -5917,7 +5917,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
throw new SafeModeException("Cannot rename snapshot for " + path,
safeMode);
}
if (isPermissionEnabled) {
checkOwner(pc, path);
}
dir.verifySnapshotName(snapshotNewName, path);
snapshotManager.renameSnapshot(path, snapshotOldName, snapshotNewName);
@ -5978,9 +5980,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
SnapshotDiffReport getSnapshotDiffReport(String path,
String fromSnapshot, String toSnapshot) throws IOException {
SnapshotDiffInfo diffs = null;
final FSPermissionChecker pc = getPermissionChecker();
readLock();
try {
checkOperation(OperationCategory.READ);
if (isPermissionEnabled) {
checkSubtreeReadPermission(pc, path, fromSnapshot);
checkSubtreeReadPermission(pc, path, toSnapshot);
}
diffs = snapshotManager.diff(path, fromSnapshot, toSnapshot);
} finally {
readUnlock();
@ -5994,6 +6001,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
Collections.<DiffReportEntry> emptyList());
}
private void checkSubtreeReadPermission(final FSPermissionChecker pc,
final String snapshottablePath, final String snapshot)
throws AccessControlException, UnresolvedLinkException {
final String fromPath = snapshot == null?
snapshottablePath: Snapshot.getSnapshotPath(snapshottablePath, snapshot);
checkPermission(pc, fromPath, false, null, null, FsAction.READ, FsAction.READ);
}
/**
* Delete a snapshot of a snapshottable directory
* @param snapshotRoot The snapshottable directory

View File

@ -22,6 +22,8 @@ import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.permission.FsPermission;
@ -30,6 +32,7 @@ import org.apache.hadoop.hdfs.protocol.LayoutVersion;
import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.namenode.FSImageSerialization;
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
import org.apache.hadoop.hdfs.tools.offlineImageViewer.ImageVisitor.ImageElement;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableUtils;
@ -126,6 +129,8 @@ class ImageLoaderCurrent implements ImageLoader {
-40, -41, -42, -43};
private int imageVersion = 0;
private final Map<String, String> nodeMap = new HashMap<String, String>();
/* (non-Javadoc)
* @see ImageLoader#canProcessVersion(int)
*/
@ -163,16 +168,15 @@ class ImageLoaderCurrent implements ImageLoader {
v.visit(ImageElement.TRANSACTION_ID, in.readLong());
}
if (LayoutVersion.supports(Feature.ADD_INODE_ID, imageVersion)) {
v.visit(ImageElement.LAST_INODE_ID, in.readLong());
}
boolean supportSnapshot = LayoutVersion.supports(Feature.SNAPSHOT,
imageVersion);
if (supportSnapshot) {
v.visit(ImageElement.SNAPSHOT_COUNTER, in.readInt());
v.visit(ImageElement.NUM_SNAPSHOTS_TOTAL, in.readInt());
v.visit(ImageElement.NUM_SNAPSHOTTABLE_DIRS, in.readInt());
}
if (LayoutVersion.supports(Feature.ADD_INODE_ID, imageVersion)) {
v.visit(ImageElement.LAST_INODE_ID, in.readLong());
}
if (LayoutVersion.supports(Feature.FSIMAGE_COMPRESSION, imageVersion)) {
@ -192,6 +196,7 @@ class ImageLoaderCurrent implements ImageLoader {
}
}
processINodes(in, v, numInodes, skipBlocks, supportSnapshot);
nodeMap.clear();
processINodesUC(in, v, skipBlocks);
@ -438,6 +443,12 @@ class ImageLoaderCurrent implements ImageLoader {
boolean skipBlocks) throws IOException {
// 1. load dir name
String dirName = FSImageSerialization.readString(in);
String oldValue = nodeMap.put(dirName, dirName);
if (oldValue != null) { // the subtree has been visited
return;
}
// 2. load possible snapshots
processSnapshots(in, v, dirName);
// 3. load children nodes
@ -622,6 +633,21 @@ class ImageLoaderCurrent implements ImageLoader {
}
} else if (numBlocks == -2) {
v.visit(ImageElement.SYMLINK, Text.readString(in));
} else if (numBlocks == -3) {
final boolean isWithName = in.readBoolean();
int dstSnapshotId = Snapshot.INVALID_ID;
if (!isWithName) {
dstSnapshotId = in.readInt();
}
v.visit(ImageElement.SNAPSHOT_DST_SNAPSHOT_ID, dstSnapshotId);
final boolean firstReferred = in.readBoolean();
if (firstReferred) {
v.visitEnclosingElement(ImageElement.SNAPSHOT_REF_INODE);
processINode(in, v, skipBlocks, parentName, isSnapshotCopy);
v.leaveEnclosingElement(); // referred inode
} else {
v.visit(ImageElement.SNAPSHOT_REF_INODE_ID, in.readLong());
}
}
processPermission(in, v);

View File

@ -86,7 +86,6 @@ abstract class ImageVisitor {
SNAPSHOT_COUNTER,
NUM_SNAPSHOTS_TOTAL,
NUM_SNAPSHOTTABLE_DIRS,
NUM_SNAPSHOTS,
SNAPSHOTS,
SNAPSHOT,
@ -110,7 +109,10 @@ abstract class ImageVisitor {
SNAPSHOT_FILE_DIFFS,
SNAPSHOT_FILE_DIFF,
NUM_SNAPSHOT_FILE_DIFF,
SNAPSHOT_FILE_SIZE
SNAPSHOT_FILE_SIZE,
SNAPSHOT_DST_SNAPSHOT_ID,
SNAPSHOT_REF_INODE_ID,
SNAPSHOT_REF_INODE
}
/**

View File

@ -172,8 +172,8 @@ public class TestSnapshottableDirListing {
Path dir2_user1 = new Path("/dir2_user1");
fs1.mkdirs(dir1_user1);
fs1.mkdirs(dir2_user1);
fs1.allowSnapshot(dir1_user1);
fs1.allowSnapshot(dir2_user1);
hdfs.allowSnapshot(dir1_user1);
hdfs.allowSnapshot(dir2_user1);
// user2
UserGroupInformation ugi2 = UserGroupInformation.createUserForTesting(
@ -184,8 +184,8 @@ public class TestSnapshottableDirListing {
Path subdir_user2 = new Path(dir_user2, "subdir");
fs2.mkdirs(dir_user2);
fs2.mkdirs(subdir_user2);
fs2.allowSnapshot(dir_user2);
fs2.allowSnapshot(subdir_user2);
hdfs.allowSnapshot(dir_user2);
hdfs.allowSnapshot(subdir_user2);
// super user
String supergroup = conf.get(DFS_PERMISSIONS_SUPERUSERGROUP_KEY,

View File

@ -289,6 +289,7 @@ public class TestOfflineImageViewer {
while((line = br.readLine()) != null)
readLsLine(line, fileContents);
br.close();
return fileContents;
}
@ -391,6 +392,7 @@ public class TestOfflineImageViewer {
File outputFile = new File(ROOT, "/fileDistributionCheckOutput");
int totalFiles = 0;
BufferedReader reader = null;
try {
copyFile(originalFsimage, testFile);
ImageVisitor v = new FileDistributionVisitor(outputFile.getPath(), 0, 0);
@ -399,7 +401,7 @@ public class TestOfflineImageViewer {
oiv.go();
BufferedReader reader = new BufferedReader(new FileReader(outputFile));
reader = new BufferedReader(new FileReader(outputFile));
String line = reader.readLine();
assertEquals(line, "Size\tNumFiles");
while((line = reader.readLine()) != null) {
@ -408,6 +410,9 @@ public class TestOfflineImageViewer {
totalFiles += Integer.parseInt(row[1]);
}
} finally {
if (reader != null) {
reader.close();
}
if(testFile.exists()) testFile.delete();
if(outputFile.exists()) outputFile.delete();
}