HDFS-6847. Support storage policy on directories and include storage policy in HdfsFileStatus. Contributed by Jing Zhao

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-6584@1618416 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2014-08-16 20:58:20 +00:00
parent 37207b75d4
commit 9b250d74f0
27 changed files with 338 additions and 100 deletions

View File

@ -20,6 +20,9 @@ HDFS-6584: Archival Storage
HDFS-6686. Change BlockPlacementPolicy to use fallback when some storage
types are unavailable. (szetszwo)
HDFS-6847. Support storage policy on directories and include storage policy
in HdfsFileStatus. (Jing Zhao via szetszwo)
Trunk (Unreleased)
INCOMPATIBLE CHANGES

View File

@ -27,6 +27,8 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.XAttr;
import org.apache.hadoop.fs.XAttr.NameSpace;
/**
* A block storage policy describes how to select the storage types
@ -44,9 +46,13 @@ public class BlockStoragePolicy {
= "dfs.block.storage.policy.creation-fallback.";
public static final String DFS_BLOCK_STORAGE_POLICY_REPLICATION_FALLBACK_KEY_PREFIX
= "dfs.block.storage.policy.replication-fallback.";
public static final String STORAGE_POLICY_XATTR_NAME = "bsp";
/** set the namespace to TRUSTED so that only privilege users can access */
public static final NameSpace XAttrNS = NameSpace.TRUSTED;
public static final int ID_BIT_LENGTH = 4;
public static final int ID_MAX = (1 << ID_BIT_LENGTH) - 1;
public static final byte ID_UNSPECIFIED = 0;
/** A block storage policy suite. */
public static class Suite {
@ -299,6 +305,20 @@ public static Suite readBlockStorageSuite(Configuration conf) {
return new Suite(firstID, policies);
}
public static String buildXAttrName() {
return XAttrNS.toString().toLowerCase() + "." + STORAGE_POLICY_XATTR_NAME;
}
public static XAttr buildXAttr(byte policyId) {
final String name = buildXAttrName();
return XAttrHelper.buildXAttr(name, new byte[] { policyId });
}
public static boolean isStoragePolicyXAttr(XAttr xattr) {
return xattr != null && xattr.getNameSpace() == BlockStoragePolicy.XAttrNS
&& xattr.getName().equals(BlockStoragePolicy.STORAGE_POLICY_XATTR_NAME);
}
private static void throwIllegalArgumentException(String message,
Configuration conf) {
throw new IllegalArgumentException(message + " in "

View File

@ -1646,8 +1646,8 @@ public boolean setReplication(String src, short replication)
}
/**
* Set storage policy for an existing file
* @param src file name
* Set storage policy for an existing file/directory
* @param src file/directory name
* @param policyName name of the storage policy
*/
public void setStoragePolicy(String src, String policyName)

View File

@ -255,8 +255,8 @@ public boolean setReplication(String src, short replication)
SnapshotAccessControlException, IOException;
/**
* Set the storage policy for an existing file
* @param src Path of an existing file.
* Set the storage policy for a file/directory
* @param src Path of an existing file/directory.
* @param policyName The name of the storage policy
* @throws SnapshotAccessControlException If access is denied
* @throws UnresolvedLinkException if <code>src</code> contains a symlink

View File

@ -47,6 +47,7 @@ public class HdfsFileStatus {
// Used by dir, not including dot and dotdot. Always zero for a regular file.
private final int childrenNum;
private final byte storagePolicy;
public static final byte[] EMPTY_NAME = new byte[0];
@ -65,9 +66,9 @@ public class HdfsFileStatus {
* @param fileId the file id
*/
public HdfsFileStatus(long length, boolean isdir, int block_replication,
long blocksize, long modification_time, long access_time,
FsPermission permission, String owner, String group,
byte[] symlink, byte[] path, long fileId, int childrenNum) {
long blocksize, long modification_time, long access_time,
FsPermission permission, String owner, String group, byte[] symlink,
byte[] path, long fileId, int childrenNum, byte storagePolicy) {
this.length = length;
this.isdir = isdir;
this.block_replication = (short)block_replication;
@ -85,6 +86,7 @@ public HdfsFileStatus(long length, boolean isdir, int block_replication,
this.path = path;
this.fileId = fileId;
this.childrenNum = childrenNum;
this.storagePolicy = storagePolicy;
}
/**
@ -242,6 +244,11 @@ final public int getChildrenNum() {
return childrenNum;
}
/** @return the storage policy id */
public final byte getStoragePolicy() {
return storagePolicy;
}
final public FileStatus makeQualified(URI defaultUri, Path path) {
return new FileStatus(getLen(), isDir(), getReplication(),
getBlockSize(), getModificationTime(),

View File

@ -56,10 +56,10 @@ public HdfsLocatedFileStatus(long length, boolean isdir,
int block_replication, long blocksize, long modification_time,
long access_time, FsPermission permission, String owner, String group,
byte[] symlink, byte[] path, long fileId, LocatedBlocks locations,
int childrenNum) {
int childrenNum, byte storagePolicy) {
super(length, isdir, block_replication, blocksize, modification_time,
access_time, permission, owner, group, symlink, path, fileId,
childrenNum);
childrenNum, storagePolicy);
this.locations = locations;
}

View File

@ -24,6 +24,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.BlockStoragePolicy;
import org.apache.hadoop.hdfs.DFSUtil;
/**
@ -61,7 +62,7 @@ public SnapshottableDirectoryStatus(long modification_time, long access_time,
int snapshotNumber, int snapshotQuota, byte[] parentFullPath) {
this.dirStatus = new HdfsFileStatus(0, true, 0, 0, modification_time,
access_time, permission, owner, group, null, localName, inodeId,
childrenNum);
childrenNum, BlockStoragePolicy.ID_UNSPECIFIED);
this.snapshotNumber = snapshotNumber;
this.snapshotQuota = snapshotQuota;
this.parentFullPath = parentFullPath;

View File

@ -42,6 +42,7 @@
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos;
import org.apache.hadoop.hdfs.BlockStoragePolicy;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.Block;
@ -1315,7 +1316,9 @@ public static HdfsFileStatus convert(HdfsFileStatusProto fs) {
fs.getPath().toByteArray(),
fs.hasFileId()? fs.getFileId(): INodeId.GRANDFATHER_INODE_ID,
fs.hasLocations() ? PBHelper.convert(fs.getLocations()) : null,
fs.hasChildrenNum() ? fs.getChildrenNum() : -1);
fs.hasChildrenNum() ? fs.getChildrenNum() : -1,
fs.hasStoragePolicy() ? (byte) fs.getStoragePolicy()
: BlockStoragePolicy.ID_UNSPECIFIED);
}
public static SnapshottableDirectoryStatus convert(
@ -1361,12 +1364,14 @@ public static HdfsFileStatusProto convert(HdfsFileStatus fs) {
setGroup(fs.getGroup()).
setFileId(fs.getFileId()).
setChildrenNum(fs.getChildrenNum()).
setPath(ByteString.copyFrom(fs.getLocalNameInBytes()));
setPath(ByteString.copyFrom(fs.getLocalNameInBytes())).
setStoragePolicy(fs.getStoragePolicy());
if (fs.isSymlink()) {
builder.setSymlink(ByteString.copyFrom(fs.getSymlinkInBytes()));
}
if (fs instanceof HdfsLocatedFileStatus) {
LocatedBlocks locations = ((HdfsLocatedFileStatus)fs).getBlockLocations();
final HdfsLocatedFileStatus lfs = (HdfsLocatedFileStatus) fs;
LocatedBlocks locations = lfs.getBlockLocations();
if (locations != null) {
builder.setLocations(PBHelper.convert(locations));
}

View File

@ -46,6 +46,7 @@
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.BlockStoragePolicy;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
@ -946,10 +947,9 @@ Block[] unprotectedSetReplication(String src, short replication,
return file.getBlocks();
}
/** Set block storage policy for a file */
/** Set block storage policy for a directory */
void setStoragePolicy(String src, byte policyId)
throws SnapshotAccessControlException, UnresolvedLinkException,
FileNotFoundException, QuotaExceededException {
throws IOException {
writeLock();
try {
unprotectedSetStoragePolicy(src, policyId);
@ -959,13 +959,30 @@ void setStoragePolicy(String src, byte policyId)
}
void unprotectedSetStoragePolicy(String src, byte policyId)
throws SnapshotAccessControlException, UnresolvedLinkException,
FileNotFoundException, QuotaExceededException {
throws IOException {
assert hasWriteLock();
final INodesInPath iip = getINodesInPath4Write(src, true);
// TODO: currently we only support setting storage policy on a file
final INodeFile inode = INodeFile.valueOf(iip.getLastINode(), src);
inode.setStoragePolicyID(policyId, iip.getLatestSnapshotId());
final INode inode = iip.getLastINode();
if (inode == null) {
throw new FileNotFoundException("File/Directory does not exist: " + src);
}
final int snapshotId = iip.getLatestSnapshotId();
if (inode.isFile()) {
inode.asFile().setStoragePolicyID(policyId, snapshotId);
} else if (inode.isDirectory()) {
setDirStoragePolicy(inode.asDirectory(), policyId, snapshotId);
} else {
throw new FileNotFoundException(src + " is not a file or directory");
}
}
private void setDirStoragePolicy(INodeDirectory inode, byte policyId,
int latestSnapshotId) throws IOException {
List<XAttr> existingXAttrs = XAttrStorage.readINodeXAttrs(inode);
XAttr xAttr = BlockStoragePolicy.buildXAttr(policyId);
List<XAttr> newXAttrs = setINodeXAttrs(existingXAttrs, Arrays.asList(xAttr),
EnumSet.of(XAttrSetFlag.CREATE, XAttrSetFlag.REPLACE));
XAttrStorage.updateINodeXAttrs(inode, newXAttrs, latestSnapshotId);
}
/**
@ -1313,7 +1330,8 @@ private static void checkSnapshot(INode target,
* @return a partial listing starting after startAfter
*/
DirectoryListing getListing(String src, byte[] startAfter,
boolean needLocation) throws UnresolvedLinkException, IOException {
boolean needLocation, boolean isSuperUser)
throws UnresolvedLinkException, IOException {
String srcs = normalizePath(src);
readLock();
@ -1321,16 +1339,19 @@ DirectoryListing getListing(String src, byte[] startAfter,
if (srcs.endsWith(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR)) {
return getSnapshotsListing(srcs, startAfter);
}
final INodesInPath inodesInPath = getLastINodeInPath(srcs, true);
final INodesInPath inodesInPath = getINodesInPath(srcs, true);
final int snapshot = inodesInPath.getPathSnapshotId();
final INode targetNode = inodesInPath.getINode(0);
final INode[] inodes = inodesInPath.getINodes();
final INode targetNode = inodes[inodes.length - 1];
byte parentStoragePolicy = isSuperUser ? getStoragePolicy(inodes,
snapshot) : BlockStoragePolicy.ID_UNSPECIFIED;
if (targetNode == null)
return null;
if (!targetNode.isDirectory()) {
return new DirectoryListing(
new HdfsFileStatus[]{createFileStatus(HdfsFileStatus.EMPTY_NAME,
targetNode, needLocation, snapshot)}, 0);
targetNode, needLocation, parentStoragePolicy, snapshot)}, 0);
}
final INodeDirectory dirInode = targetNode.asDirectory();
@ -1343,8 +1364,10 @@ DirectoryListing getListing(String src, byte[] startAfter,
HdfsFileStatus listing[] = new HdfsFileStatus[numOfListing];
for (int i=0; i<numOfListing && locationBudget>0; i++) {
INode cur = contents.get(startChild+i);
byte curPolicy = cur.getStoragePolicyID(snapshot);
listing[i] = createFileStatus(cur.getLocalNameBytes(), cur,
needLocation, snapshot);
needLocation, curPolicy != BlockStoragePolicy.ID_UNSPECIFIED ?
curPolicy : parentStoragePolicy, snapshot);
listingCnt++;
if (needLocation) {
// Once we hit lsLimit locations, stop.
@ -1395,7 +1418,7 @@ private DirectoryListing getSnapshotsListing(String src, byte[] startAfter)
for (int i = 0; i < numOfListing; i++) {
Root sRoot = snapshots.get(i + skipSize).getRoot();
listing[i] = createFileStatus(sRoot.getLocalNameBytes(), sRoot,
Snapshot.CURRENT_STATE_ID);
BlockStoragePolicy.ID_UNSPECIFIED, Snapshot.CURRENT_STATE_ID);
}
return new DirectoryListing(
listing, snapshots.size() - skipSize - numOfListing);
@ -1417,8 +1440,8 @@ HdfsFileStatus getFileInfo(String src, boolean resolveLink)
}
final INodesInPath inodesInPath = getLastINodeInPath(srcs, resolveLink);
final INode i = inodesInPath.getINode(0);
return i == null? null: createFileStatus(HdfsFileStatus.EMPTY_NAME, i,
inodesInPath.getPathSnapshotId());
return i == null ? null : createFileStatus(HdfsFileStatus.EMPTY_NAME, i,
BlockStoragePolicy.ID_UNSPECIFIED, inodesInPath.getPathSnapshotId());
} finally {
readUnlock();
}
@ -1435,7 +1458,7 @@ private HdfsFileStatus getFileInfo4DotSnapshot(String src)
throws UnresolvedLinkException {
if (getINode4DotSnapshot(src) != null) {
return new HdfsFileStatus(0, true, 0, 0, 0, 0, null, null, null, null,
HdfsFileStatus.EMPTY_NAME, -1L, 0);
HdfsFileStatus.EMPTY_NAME, -1L, 0, BlockStoragePolicy.ID_UNSPECIFIED);
}
return null;
}
@ -2247,18 +2270,19 @@ void reset() {
* @throws IOException if any error occurs
*/
private HdfsFileStatus createFileStatus(byte[] path, INode node,
boolean needLocation, int snapshot) throws IOException {
boolean needLocation, byte storagePolicy, int snapshot) throws IOException {
if (needLocation) {
return createLocatedFileStatus(path, node, snapshot);
return createLocatedFileStatus(path, node, storagePolicy, snapshot);
} else {
return createFileStatus(path, node, snapshot);
return createFileStatus(path, node, storagePolicy, snapshot);
}
}
/**
* Create FileStatus by file INode
*/
HdfsFileStatus createFileStatus(byte[] path, INode node,
int snapshot) {
HdfsFileStatus createFileStatus(byte[] path, INode node, byte storagePolicy,
int snapshot) {
long size = 0; // length is zero for directories
short replication = 0;
long blocksize = 0;
@ -2284,14 +2308,24 @@ HdfsFileStatus createFileStatus(byte[] path, INode node,
node.isSymlink() ? node.asSymlink().getSymlink() : null,
path,
node.getId(),
childrenNum);
childrenNum, storagePolicy);
}
private byte getStoragePolicy(INode[] inodes, int snapshotId) {
for (int i = inodes.length - 1; i >= 0; i--) {
byte policy = inodes[i].getStoragePolicyID(snapshotId);
if (policy != BlockStoragePolicy.ID_UNSPECIFIED) {
return policy;
}
}
return BlockStoragePolicy.ID_UNSPECIFIED;
}
/**
* Create FileStatus with location info by file INode
*/
private HdfsLocatedFileStatus createLocatedFileStatus(byte[] path,
INode node, int snapshot) throws IOException {
INode node, byte storagePolicy, int snapshot) throws IOException {
assert hasReadLock();
long size = 0; // length is zero for directories
short replication = 0;
@ -2324,7 +2358,7 @@ private HdfsLocatedFileStatus createLocatedFileStatus(byte[] path,
getPermissionForFileStatus(node, snapshot),
node.getUserName(snapshot), node.getGroupName(snapshot),
node.isSymlink() ? node.asSymlink().getSymlink() : null, path,
node.getId(), loc, childrenNum);
node.getId(), loc, childrenNum, storagePolicy);
// Set caching information for the located blocks.
if (loc != null) {
CacheManager cacheManager = namesystem.getCacheManager();

View File

@ -33,6 +33,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.hdfs.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@ -365,7 +366,8 @@ private long applyEditLogOp(FSEditLogOp op, FSDirectory fsDir,
// add the op into retry cache if necessary
if (toAddRetryCache) {
HdfsFileStatus stat = fsNamesys.dir.createFileStatus(
HdfsFileStatus.EMPTY_NAME, newFile, Snapshot.CURRENT_STATE_ID);
HdfsFileStatus.EMPTY_NAME, newFile,
BlockStoragePolicy.ID_UNSPECIFIED, Snapshot.CURRENT_STATE_ID);
fsNamesys.addCacheEntryWithPayload(addCloseOp.rpcClientId,
addCloseOp.rpcCallId, stat);
}

View File

@ -2221,9 +2221,9 @@ private boolean setReplicationInt(String src, final short replication)
}
/**
* Set the storage policy for an existing file.
* Set the storage policy for a file or a directory.
*
* @param src file name
* @param src file/directory path
* @param policyName storage policy name
*/
void setStoragePolicy(String src, final String policyName)
@ -4530,15 +4530,17 @@ private DirectoryListing getListingInt(String src, byte[] startAfter,
}
}
boolean isSuperUser = true;
if (isPermissionEnabled) {
if (dir.isDir(src)) {
checkPathAccess(pc, src, FsAction.READ_EXECUTE);
} else {
checkTraverse(pc, src);
}
isSuperUser = pc.isSuperUser();
}
logAuditEvent(true, "listStatus", src);
dl = dir.getListing(src, startAfter, needLocation);
dl = dir.getListing(src, startAfter, needLocation, isSuperUser);
} finally {
readUnlock();
}

View File

@ -684,6 +684,14 @@ public final INode setAccessTime(long accessTime, int latestSnapshotId)
return this;
}
/**
* @return the storage policy id of the inode
*/
public abstract byte getStoragePolicyID(int snapshotId);
public byte getStoragePolicyID() {
return getStoragePolicyID(Snapshot.CURRENT_STATE_ID);
}
/**
* Breaks {@code path} into components.

View File

@ -61,6 +61,9 @@ public interface INodeAttributes {
/** @return the access time. */
public long getAccessTime();
/** @return the storage policy ID */
public byte getStoragePolicyID();
/** A read-only copy of the inode attributes. */
public static abstract class SnapshotCopy implements INodeAttributes {
private final byte[] name;

View File

@ -26,7 +26,9 @@
import java.util.Map;
import org.apache.hadoop.fs.PathIsNotDirectoryException;
import org.apache.hadoop.fs.XAttr;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.BlockStoragePolicy;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.protocol.SnapshotException;
@ -40,6 +42,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
/**
* Directory INode class.
@ -103,6 +106,22 @@ public final INodeDirectory asDirectory() {
return this;
}
@Override
public byte getStoragePolicyID(int snapshotId) {
if (snapshotId != Snapshot.CURRENT_STATE_ID) {
return getSnapshotINode(snapshotId).getStoragePolicyID();
}
XAttrFeature f = getXAttrFeature();
ImmutableList<XAttr> xattrs = f == null ? ImmutableList.<XAttr> of() : f
.getXAttrs();
for (XAttr xattr : xattrs) {
if (BlockStoragePolicy.isStoragePolicyXAttr(xattr)) {
return (xattr.getValue())[0];
}
}
return BlockStoragePolicy.ID_UNSPECIFIED;
}
void setQuota(long nsQuota, long dsQuota) {
DirectoryWithQuotaFeature quota = getDirectoryWithQuotaFeature();
if (quota != null) {

View File

@ -18,10 +18,12 @@
package org.apache.hadoop.hdfs.server.namenode;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.XAttr;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.server.namenode.XAttrFeature;
import org.apache.hadoop.hdfs.BlockStoragePolicy;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
/**
* The attributes of an inode.
@ -58,6 +60,19 @@ && getPermissionLong() == other.getPermissionLong()
&& getAclFeature() == other.getAclFeature()
&& getXAttrFeature() == other.getXAttrFeature();
}
@Override
public byte getStoragePolicyID() {
XAttrFeature f = getXAttrFeature();
ImmutableList<XAttr> xattrs = f == null ? ImmutableList.<XAttr> of() : f
.getXAttrs();
for (XAttr xattr : xattrs) {
if (BlockStoragePolicy.isStoragePolicyXAttr(xattr)) {
return (xattr.getValue())[0];
}
}
return BlockStoragePolicy.ID_UNSPECIFIED;
}
}
public static class CopyWithQuota extends INodeDirectoryAttributes.SnapshotCopy {

View File

@ -367,20 +367,28 @@ public long getPreferredBlockSize() {
return HeaderFormat.getPreferredBlockSize(header);
}
@Override
public byte getStoragePolicyID(int snapshotId) {
if (snapshotId != Snapshot.CURRENT_STATE_ID) {
return getSnapshotINode(snapshotId).getStoragePolicyID();
}
return getStoragePolicyID();
}
@Override
public byte getStoragePolicyID() {
return HeaderFormat.getStoragePolicyID(header);
}
/** Set the policy id of the file */
public final void setStoragePolicyID(byte policyId) {
header = HeaderFormat.STORAGE_POLICY_ID.BITS.combine(policyId, header);
private void setStoragePolicyID(byte storagePolicyId) {
header = HeaderFormat.STORAGE_POLICY_ID.BITS.combine(storagePolicyId,
header);
}
public final void setStoragePolicyID(byte policyId, int lastSnapshotId)
throws QuotaExceededException {
recordModification(lastSnapshotId);
setStoragePolicyID(policyId);
public final void setStoragePolicyID(byte storagePolicyId,
int latestSnapshotId) throws QuotaExceededException {
recordModification(latestSnapshotId);
setStoragePolicyID(storagePolicyId);
}
@Override

View File

@ -33,9 +33,6 @@ public interface INodeFileAttributes extends INodeAttributes {
/** @return preferred block size in bytes */
public long getPreferredBlockSize();
/** @return the storage policy ID. */
public byte getStoragePolicyID();
/** @return the header as a long. */
public long getHeaderLong();

View File

@ -22,6 +22,7 @@
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.server.namenode.Quota.Counts;
import org.apache.hadoop.util.GSet;
@ -121,6 +122,11 @@ public Counts cleanSubtree(int snapshotId, int priorSnapshotId,
boolean countDiffChange) throws QuotaExceededException {
return null;
}
@Override
public byte getStoragePolicyID(int snapshotId) {
return BlockStoragePolicy.ID_UNSPECIFIED;
}
};
return map.get(inode);

View File

@ -286,6 +286,11 @@ public final void setAccessTime(long accessTime) {
referred.setAccessTime(accessTime);
}
@Override
public final byte getStoragePolicyID(int snapshotId) {
return referred.getStoragePolicyID(snapshotId);
}
@Override
final void recordModification(int latestSnapshotId)
throws QuotaExceededException {

View File

@ -145,4 +145,10 @@ public void removeXAttrFeature() {
public void addXAttrFeature(XAttrFeature f) {
throw new UnsupportedOperationException("XAttrs are not supported on symlinks");
}
@Override
public byte getStoragePolicyID(int snapshotId) {
throw new UnsupportedOperationException(
"Storage policy are not supported on symlinks");
}
}

View File

@ -21,6 +21,7 @@
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.BlockStoragePolicy;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.XAttrHelper;
import org.apache.hadoop.hdfs.protocol.*;
@ -220,6 +221,7 @@ public static String toJsonString(final HdfsFileStatus status,
m.put("replication", status.getReplication());
m.put("fileId", status.getFileId());
m.put("childrenNum", status.getChildrenNum());
m.put("storagePolicy", status.getStoragePolicy());
return includeType ? toJsonString(FileStatus.class, m): JSON.toString(m);
}
@ -250,9 +252,12 @@ public static HdfsFileStatus toFileStatus(final Map<?, ?> json, boolean includes
Long childrenNumLong = (Long) m.get("childrenNum");
final int childrenNum = (childrenNumLong == null) ? -1
: childrenNumLong.intValue();
final byte storagePolicy = m.containsKey("storagePolicy") ?
(byte) (long) (Long) m.get("storagePolicy") :
BlockStoragePolicy.ID_UNSPECIFIED;
return new HdfsFileStatus(len, type == PathType.DIRECTORY, replication,
blockSize, mTime, aTime, permission, owner, group,
symlink, DFSUtil.string2Bytes(localName), fileId, childrenNum);
blockSize, mTime, aTime, permission, owner, group, symlink,
DFSUtil.string2Bytes(localName), fileId, childrenNum, storagePolicy);
}
/** Convert an ExtendedBlock to a Json map. */

View File

@ -244,6 +244,7 @@ message HdfsFileStatusProto {
// Optional field for fileId
optional uint64 fileId = 13 [default = 0]; // default as an invalid id
optional int32 childrenNum = 14 [default = -1];
optional uint32 storagePolicy = 15 [default = 0]; // block storage policy id
}
/**

View File

@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hdfs;
import static org.apache.hadoop.hdfs.BlockStoragePolicy.ID_UNSPECIFIED;
import java.io.FileNotFoundException;
import java.util.EnumSet;
import java.util.HashMap;
@ -25,9 +27,10 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert;
@ -133,12 +136,18 @@ static void assertReplicationFallback(BlockStoragePolicy policy, StorageType non
Assert.assertEquals(null, policy.getReplicationFallback(both));
}
private void checkDirectoryListing(HdfsFileStatus[] stats, byte... policies) {
Assert.assertEquals(stats.length, policies.length);
for (int i = 0; i < stats.length; i++) {
Assert.assertEquals(stats[i].getStoragePolicy(), policies[i]);
}
}
@Test
public void testSetStoragePolicy() throws Exception {
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(REPLICATION).build();
cluster.waitActive();
FSDirectory fsdir = cluster.getNamesystem().getFSDirectory();
final DistributedFileSystem fs = cluster.getFileSystem();
try {
final Path dir = new Path("/testSetStoragePolicy");
@ -158,10 +167,13 @@ public void testSetStoragePolicy() throws Exception {
GenericTestUtils.assertExceptionContains(invalidPolicyName, e);
}
// check internal status
INodeFile fooFileNode = fsdir.getINode4Write(fooFile.toString()).asFile();
INodeFile barFile1Node = fsdir.getINode4Write(barFile1.toString()).asFile();
INodeFile barFile2Node = fsdir.getINode4Write(barFile2.toString()).asFile();
// check storage policy
HdfsFileStatus[] dirList = fs.getClient().listPaths(dir.toString(),
HdfsFileStatus.EMPTY_NAME, true).getPartialListing();
HdfsFileStatus[] barList = fs.getClient().listPaths(barDir.toString(),
HdfsFileStatus.EMPTY_NAME, true).getPartialListing();
checkDirectoryListing(dirList, ID_UNSPECIFIED, ID_UNSPECIFIED);
checkDirectoryListing(barList, ID_UNSPECIFIED, ID_UNSPECIFIED);
final Path invalidPath = new Path("/invalidPath");
try {
@ -172,37 +184,116 @@ public void testSetStoragePolicy() throws Exception {
}
fs.setStoragePolicy(fooFile, "COLD");
fs.setStoragePolicy(barFile1, "WARM");
fs.setStoragePolicy(barFile2, "WARM");
// TODO: set storage policy on a directory
fs.setStoragePolicy(barDir, "WARM");
fs.setStoragePolicy(barFile2, "HOT");
// check internal status
Assert.assertEquals(COLD, fooFileNode.getStoragePolicyID());
Assert.assertEquals(WARM, barFile1Node.getStoragePolicyID());
Assert.assertEquals(WARM, barFile2Node.getStoragePolicyID());
dirList = fs.getClient().listPaths(dir.toString(),
HdfsFileStatus.EMPTY_NAME).getPartialListing();
barList = fs.getClient().listPaths(barDir.toString(),
HdfsFileStatus.EMPTY_NAME).getPartialListing();
checkDirectoryListing(dirList, WARM, COLD); // bar is warm, foo is cold
checkDirectoryListing(barList, WARM, HOT);
// restart namenode to make sure the editlog is correct
cluster.restartNameNode(true);
fsdir = cluster.getNamesystem().getFSDirectory();
fooFileNode = fsdir.getINode4Write(fooFile.toString()).asFile();
Assert.assertEquals(COLD, fooFileNode.getStoragePolicyID());
barFile1Node = fsdir.getINode4Write(barFile1.toString()).asFile();
Assert.assertEquals(WARM, barFile1Node.getStoragePolicyID());
barFile2Node = fsdir.getINode4Write(barFile2.toString()).asFile();
Assert.assertEquals(WARM, barFile2Node.getStoragePolicyID());
dirList = fs.getClient().listPaths(dir.toString(),
HdfsFileStatus.EMPTY_NAME, true).getPartialListing();
barList = fs.getClient().listPaths(barDir.toString(),
HdfsFileStatus.EMPTY_NAME, true).getPartialListing();
checkDirectoryListing(dirList, WARM, COLD); // bar is warm, foo is cold
checkDirectoryListing(barList, WARM, HOT);
// restart namenode with checkpoint to make sure the fsimage is correct
fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
fs.saveNamespace();
fs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
cluster.restartNameNode(true);
fsdir = cluster.getNamesystem().getFSDirectory();
fooFileNode = fsdir.getINode4Write(fooFile.toString()).asFile();
Assert.assertEquals(COLD, fooFileNode.getStoragePolicyID());
barFile1Node = fsdir.getINode4Write(barFile1.toString()).asFile();
Assert.assertEquals(WARM, barFile1Node.getStoragePolicyID());
barFile2Node = fsdir.getINode4Write(barFile2.toString()).asFile();
Assert.assertEquals(WARM, barFile2Node.getStoragePolicyID());
dirList = fs.getClient().listPaths(dir.toString(),
HdfsFileStatus.EMPTY_NAME).getPartialListing();
barList = fs.getClient().listPaths(barDir.toString(),
HdfsFileStatus.EMPTY_NAME).getPartialListing();
checkDirectoryListing(dirList, WARM, COLD); // bar is warm, foo is cold
checkDirectoryListing(barList, WARM, HOT);
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
@Test
public void testSetStoragePolicyWithSnapshot() throws Exception {
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(REPLICATION).build();
cluster.waitActive();
final DistributedFileSystem fs = cluster.getFileSystem();
try {
final Path dir = new Path("/testSetStoragePolicyWithSnapshot");
final Path fooDir = new Path(dir, "foo");
final Path fooFile1= new Path(fooDir, "f1");
final Path fooFile2= new Path(fooDir, "f2");
DFSTestUtil.createFile(fs, fooFile1, FILE_LEN, REPLICATION, 0L);
DFSTestUtil.createFile(fs, fooFile2, FILE_LEN, REPLICATION, 0L);
fs.setStoragePolicy(fooDir, "WARM");
HdfsFileStatus[] dirList = fs.getClient().listPaths(dir.toString(),
HdfsFileStatus.EMPTY_NAME, true).getPartialListing();
checkDirectoryListing(dirList, WARM);
HdfsFileStatus[] fooList = fs.getClient().listPaths(fooDir.toString(),
HdfsFileStatus.EMPTY_NAME, true).getPartialListing();
checkDirectoryListing(fooList, WARM, WARM);
// take snapshot
SnapshotTestHelper.createSnapshot(fs, dir, "s1");
// change the storage policy of fooFile1
fs.setStoragePolicy(fooFile1, "COLD");
fooList = fs.getClient().listPaths(fooDir.toString(),
HdfsFileStatus.EMPTY_NAME).getPartialListing();
checkDirectoryListing(fooList, COLD, WARM);
// check the policy for /dir/.snapshot/s1/foo/f1
Path s1f1 = SnapshotTestHelper.getSnapshotPath(dir, "s1", "foo/f1");
DirectoryListing f1Listing = fs.getClient().listPaths(s1f1.toString(),
HdfsFileStatus.EMPTY_NAME);
checkDirectoryListing(f1Listing.getPartialListing(), WARM);
// delete f1
fs.delete(fooFile1, true);
fooList = fs.getClient().listPaths(fooDir.toString(),
HdfsFileStatus.EMPTY_NAME).getPartialListing();
checkDirectoryListing(fooList, WARM);
// check the policy for /dir/.snapshot/s1/foo/f1 again after the deletion
checkDirectoryListing(fs.getClient().listPaths(s1f1.toString(),
HdfsFileStatus.EMPTY_NAME).getPartialListing(), WARM);
// change the storage policy of foo dir
fs.setStoragePolicy(fooDir, "HOT");
// /dir/foo is now hot
dirList = fs.getClient().listPaths(dir.toString(),
HdfsFileStatus.EMPTY_NAME, true).getPartialListing();
checkDirectoryListing(dirList, HOT);
// /dir/foo/f2 is hot
fooList = fs.getClient().listPaths(fooDir.toString(),
HdfsFileStatus.EMPTY_NAME).getPartialListing();
checkDirectoryListing(fooList, HOT);
// check storage policy of snapshot path
Path s1 = SnapshotTestHelper.getSnapshotRoot(dir, "s1");
Path s1foo = SnapshotTestHelper.getSnapshotPath(dir, "s1", "foo");
checkDirectoryListing(fs.getClient().listPaths(s1.toString(),
HdfsFileStatus.EMPTY_NAME).getPartialListing(), WARM);
// /dir/.snapshot/.s1/foo/f1 and /dir/.snapshot/.s1/foo/f2 are warm
checkDirectoryListing(fs.getClient().listPaths(s1foo.toString(),
HdfsFileStatus.EMPTY_NAME).getPartialListing(), WARM, WARM);
// delete foo
fs.delete(fooDir, true);
checkDirectoryListing(fs.getClient().listPaths(s1.toString(),
HdfsFileStatus.EMPTY_NAME).getPartialListing(), WARM);
checkDirectoryListing(fs.getClient().listPaths(s1foo.toString(),
HdfsFileStatus.EMPTY_NAME).getPartialListing(), WARM, WARM);
} finally {
if (cluster != null) {
cluster.shutdown();

View File

@ -253,12 +253,12 @@ public Object answer(InvocationOnMock invocation)
Mockito.doReturn(
new HdfsFileStatus(0, false, 1, 1024, 0, 0, new FsPermission(
(short) 777), "owner", "group", new byte[0], new byte[0],
1010, 0)).when(mockNN).getFileInfo(anyString());
1010, 0, (byte) 0)).when(mockNN).getFileInfo(anyString());
Mockito.doReturn(
new HdfsFileStatus(0, false, 1, 1024, 0, 0, new FsPermission(
(short) 777), "owner", "group", new byte[0], new byte[0],
1010, 0))
1010, 0, (byte) 0))
.when(mockNN)
.create(anyString(), (FsPermission) anyObject(), anyString(),
(EnumSetWritable<CreateFlag>) anyObject(), anyBoolean(),

View File

@ -17,11 +17,11 @@
*/
package org.apache.hadoop.hdfs;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.anyShort;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Matchers.anyShort;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.spy;
@ -35,7 +35,6 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path;
@ -339,12 +338,12 @@ public void testFactory() throws Exception {
Mockito.doReturn(
new HdfsFileStatus(0, false, 1, 1024, 0, 0, new FsPermission(
(short) 777), "owner", "group", new byte[0], new byte[0],
1010, 0)).when(mcp).getFileInfo(anyString());
1010, 0, (byte) 0)).when(mcp).getFileInfo(anyString());
Mockito
.doReturn(
new HdfsFileStatus(0, false, 1, 1024, 0, 0, new FsPermission(
(short) 777), "owner", "group", new byte[0], new byte[0],
1010, 0))
1010, 0, (byte) 0))
.when(mcp)
.create(anyString(), (FsPermission) anyObject(), anyString(),
(EnumSetWritable<CreateFlag>) anyObject(), anyBoolean(),

View File

@ -1015,10 +1015,11 @@ public void testFsckFileNotFound() throws Exception {
path = DFSUtil.string2Bytes(pathString);
long fileId = 312321L;
int numChildren = 1;
byte storagePolicy = 0;
HdfsFileStatus file = new HdfsFileStatus(length, isDir, blockReplication,
blockSize, modTime, accessTime, perms, owner, group, symlink, path,
fileId, numChildren);
fileId, numChildren, storagePolicy);
Result res = new Result(conf);
try {

View File

@ -64,7 +64,7 @@ public void testHdfsFileStatus() {
final HdfsFileStatus status = new HdfsFileStatus(1001L, false, 3, 1L << 26,
now, now + 10, new FsPermission((short) 0644), "user", "group",
DFSUtil.string2Bytes("bar"), DFSUtil.string2Bytes("foo"),
INodeId.GRANDFATHER_INODE_ID, 0);
INodeId.GRANDFATHER_INODE_ID, 0, (byte) 0);
final FileStatus fstatus = toFileStatus(status, parent);
System.out.println("status = " + status);
System.out.println("fstatus = " + fstatus);