diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStoragePolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStoragePolicy.java index ffcfbdb89c..67f134dd0b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStoragePolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStoragePolicy.java @@ -68,6 +68,17 @@ public BlockStoragePolicy getPolicy(byte id) { public BlockStoragePolicy getDefaultPolicy() { return getPolicy(defaultPolicyID); } + + public BlockStoragePolicy getPolicy(String policyName) { + if (policies != null) { + for (BlockStoragePolicy policy : policies) { + if (policy != null && policy.name.equals(policyName)) { + return policy; + } + } + } + return null; + } } /** A 4-bit policy ID */ @@ -172,6 +183,10 @@ public String toString() { + ", replicationFallbacks=" + Arrays.asList(replicationFallbacks); } + public byte getId() { + return id; + } + private static StorageType getFallback(EnumSet unavailables, StorageType[] fallbacks) { for(StorageType fb : fallbacks) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index b9af35ea25..d9eda25574 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -1645,6 +1645,25 @@ public boolean setReplication(String src, short replication) } } + /** + * Set storage policy for an existing file + * @param src file name + * @param policyName name of the storage policy + */ + public void setStoragePolicy(String src, String policyName) + throws IOException { + try { + namenode.setStoragePolicy(src, policyName); + } catch (RemoteException e) { + throw e.unwrapRemoteException(AccessControlException.class, + FileNotFoundException.class, + SafeModeException.class, + NSQuotaExceededException.class, + UnresolvedPathException.class, + SnapshotAccessControlException.class); + } + } + /** * Rename file or directory. * @see ClientProtocol#rename(String, String) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java index e20c61f518..d51d9e7dd9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java @@ -461,7 +461,33 @@ public Boolean next(final FileSystem fs, final Path p) } }.resolve(this, absF); } - + + public void setStoragePolicy(final Path src, final String policyName) + throws IOException { + statistics.incrementWriteOps(1); + Path absF = fixRelativePart(src); + new FileSystemLinkResolver() { + @Override + public Void doCall(final Path p) + throws IOException, UnresolvedLinkException { + dfs.setStoragePolicy(getPathName(p), policyName); + return null; + } + @Override + public Void next(final FileSystem fs, final Path p) + throws IOException { + if (fs instanceof DistributedFileSystem) { + ((DistributedFileSystem) fs).setStoragePolicy(p, policyName); + return null; + } else { + throw new UnsupportedOperationException( + "Cannot perform setStoragePolicy on a non-DistributedFileSystem: " + + src + " -> " + p); + } + } + }.resolve(this, absF); + } + /** * Move blocks from srcs to trg and delete srcs afterwards. * The file block sizes must be the same. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java index 8dbe1f7609..c9cfe402aa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java @@ -254,6 +254,20 @@ public boolean setReplication(String src, short replication) FileNotFoundException, SafeModeException, UnresolvedLinkException, SnapshotAccessControlException, IOException; + /** + * Set the storage policy for an existing file + * @param src Path of an existing file. + * @param policyName The name of the storage policy + * @throws SnapshotAccessControlException If access is denied + * @throws UnresolvedLinkException if src contains a symlink + * @throws FileNotFoundException If file/dir src is not found + * @throws QuotaExceededException If changes violate the quota restriction + */ + @Idempotent + public void setStoragePolicy(String src, String policyName) + throws SnapshotAccessControlException, UnresolvedLinkException, + FileNotFoundException, QuotaExceededException, IOException; + /** * Set permissions for an existing file/directory. * diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java index c4211b1d79..82a1a07a53 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java @@ -168,6 +168,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetReplicationResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetSafeModeRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetSafeModeResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetStoragePolicyRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetStoragePolicyResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetTimesRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetTimesResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdateBlockForPipelineRequestProto; @@ -225,6 +227,8 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements static final GetSnapshottableDirListingResponseProto NULL_GET_SNAPSHOTTABLE_DIR_LISTING_RESPONSE = GetSnapshottableDirListingResponseProto.newBuilder().build(); + static final SetStoragePolicyResponseProto VOID_SET_STORAGE_POLICY_RESPONSE = + SetStoragePolicyResponseProto.newBuilder().build(); private static final CreateResponseProto VOID_CREATE_RESPONSE = CreateResponseProto.newBuilder().build(); @@ -1354,4 +1358,16 @@ public CheckAccessResponseProto checkAccess(RpcController controller, } return VOID_CHECKACCESS_RESPONSE; } + + @Override + public SetStoragePolicyResponseProto setStoragePolicy( + RpcController controller, SetStoragePolicyRequestProto request) + throws ServiceException { + try { + server.setStoragePolicy(request.getSrc(), request.getPolicyName()); + } catch (IOException e) { + throw new ServiceException(e); + } + return VOID_SET_STORAGE_POLICY_RESPONSE; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java index 85dbb7d718..adfcf364f7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java @@ -60,7 +60,9 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException; +import org.apache.hadoop.hdfs.protocol.QuotaExceededException; import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo; +import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException; import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus; import org.apache.hadoop.hdfs.protocol.proto.AclProtos.GetAclStatusRequestProto; @@ -146,6 +148,7 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdateBlockForPipelineRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CheckAccessRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetStoragePolicyRequestProto; import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.GetXAttrsRequestProto; import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.ListXAttrsRequestProto; import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.RemoveXAttrRequestProto; @@ -1359,4 +1362,17 @@ public void checkAccess(String path, FsAction mode) throws IOException { throw ProtobufHelper.getRemoteException(e); } } + + @Override + public void setStoragePolicy(String src, String policyName) + throws SnapshotAccessControlException, UnresolvedLinkException, + FileNotFoundException, QuotaExceededException, IOException { + SetStoragePolicyRequestProto req = SetStoragePolicyRequestProto + .newBuilder().setSrc(src).setPolicyName(policyName).build(); + try { + rpcProxy.setStoragePolicy(null, req); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 1a8b420fd3..9cb3c6621d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -395,7 +395,11 @@ private static BlockTokenSecretManager createBlockTokenSecretManager( lifetimeMin*60*1000L, 0, null, encryptionAlgorithm); } } - + + public BlockStoragePolicy getStoragePolicy(final String policyName) { + return storagePolicySuite.getPolicy(policyName); + } + public void setBlockPoolId(String blockPoolId) { if (isBlockTokenEnabled()) { blockTokenSecretManager.setBlockPoolId(blockPoolId); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java index c9b69a3f6e..8c7edcd1c5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java @@ -946,6 +946,28 @@ Block[] unprotectedSetReplication(String src, short replication, return file.getBlocks(); } + /** Set block storage policy for a file */ + void setStoragePolicy(String src, byte policyId) + throws SnapshotAccessControlException, UnresolvedLinkException, + FileNotFoundException, QuotaExceededException { + writeLock(); + try { + unprotectedSetStoragePolicy(src, policyId); + } finally { + writeUnlock(); + } + } + + void unprotectedSetStoragePolicy(String src, byte policyId) + throws SnapshotAccessControlException, UnresolvedLinkException, + FileNotFoundException, QuotaExceededException { + assert hasWriteLock(); + final INodesInPath iip = getINodesInPath4Write(src, true); + // TODO: currently we only support setting storage policy on a file + final INodeFile inode = INodeFile.valueOf(iip.getLastINode(), src); + inode.setStoragePolicyID(policyId, iip.getLatestSnapshotId()); + } + /** * @param path the file path * @return the block size of the file. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java index b2adcd455f..7434a06dc6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java @@ -82,6 +82,7 @@ import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetPermissionsOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetQuotaOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetReplicationOp; +import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetStoragePolicyOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetXAttrOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SymlinkOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TimesOp; @@ -812,7 +813,16 @@ void logSetReplication(String src, short replication) { .setReplication(replication); logEdit(op); } - + + /** + * Add set storage policy id record to edit log + */ + void logSetStoragePolicy(String src, byte policyId) { + SetStoragePolicyOp op = SetStoragePolicyOp.getInstance(cache.get()) + .setPath(src).setPolicyId(policyId); + logEdit(op); + } + /** Add set namespace quota record to edit log * * @param src the string representation of the path to a directory diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java index a721491948..3a13d7f177 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java @@ -78,6 +78,7 @@ import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetPermissionsOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetQuotaOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetReplicationOp; +import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetStoragePolicyOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetXAttrOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RemoveXAttrOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SymlinkOp; @@ -827,6 +828,13 @@ private long applyEditLogOp(FSEditLogOp op, FSDirectory fsDir, } break; } + case OP_SET_STORAGE_POLICY: { + SetStoragePolicyOp setStoragePolicyOp = (SetStoragePolicyOp) op; + fsDir.unprotectedSetStoragePolicy( + renameReservedPathsOnUpgrade(setStoragePolicyOp.path, logVersion), + setStoragePolicyOp.policyId); + break; + } default: throw new IOException("Invalid operation read " + op.opCode); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java index 5543e0cb86..78fad3187f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java @@ -61,6 +61,7 @@ import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_TIMES; import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_UPDATE_BLOCKS; import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_UPDATE_MASTER_KEY; +import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_STORAGE_POLICY; import java.io.DataInput; import java.io.DataInputStream; @@ -193,6 +194,7 @@ public OpInstanceCache() { OP_ROLLING_UPGRADE_FINALIZE, "finalize")); inst.put(OP_SET_XATTR, new SetXAttrOp()); inst.put(OP_REMOVE_XATTR, new RemoveXAttrOp()); + inst.put(OP_SET_STORAGE_POLICY, new SetStoragePolicyOp()); } public FSEditLogOp get(FSEditLogOpCodes opcode) { @@ -3780,6 +3782,71 @@ static class RollbackException extends IOException { } } + /** {@literal @Idempotent} for {@link ClientProtocol#setStoragePolicy} */ + static class SetStoragePolicyOp extends FSEditLogOp { + String path; + byte policyId; + + private SetStoragePolicyOp() { + super(OP_SET_STORAGE_POLICY); + } + + static SetStoragePolicyOp getInstance(OpInstanceCache cache) { + return (SetStoragePolicyOp) cache.get(OP_SET_STORAGE_POLICY); + } + + SetStoragePolicyOp setPath(String path) { + this.path = path; + return this; + } + + SetStoragePolicyOp setPolicyId(byte policyId) { + this.policyId = policyId; + return this; + } + + @Override + public void writeFields(DataOutputStream out) throws IOException { + FSImageSerialization.writeString(path, out); + out.writeByte(policyId); + } + + @Override + void readFields(DataInputStream in, int logVersion) + throws IOException { + this.path = FSImageSerialization.readString(in); + this.policyId = in.readByte(); + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("SetStoragePolicyOp [path="); + builder.append(path); + builder.append(", policyId="); + builder.append(policyId); + builder.append(", opCode="); + builder.append(opCode); + builder.append(", txid="); + builder.append(txid); + builder.append("]"); + return builder.toString(); + } + + @Override + protected void toXml(ContentHandler contentHandler) throws SAXException { + XMLUtils.addSaxString(contentHandler, "PATH", path); + XMLUtils.addSaxString(contentHandler, "POLICYID", + Byte.valueOf(policyId).toString()); + } + + @Override + void fromXml(Stanza st) throws InvalidXmlException { + this.path = st.getValue("PATH"); + this.policyId = Byte.valueOf(st.getValue("POLICYID")); + } + } + /** * Class for writing editlog ops */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java index bf4bbb4a60..86be54adb7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java @@ -72,6 +72,7 @@ public enum FSEditLogOpCodes { OP_ROLLING_UPGRADE_FINALIZE ((byte) 42), OP_SET_XATTR ((byte) 43), OP_REMOVE_XATTR ((byte) 44), + OP_SET_STORAGE_POLICY ((byte) 45), // Note that the current range of the valid OP code is 0~127 OP_INVALID ((byte) -1); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index d85c895b72..872a68a676 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -154,6 +154,7 @@ import org.apache.hadoop.fs.permission.PermissionStatus; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.ha.ServiceFailedException; +import org.apache.hadoop.hdfs.BlockStoragePolicy; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HAUtil; @@ -2219,6 +2220,52 @@ private boolean setReplicationInt(String src, final short replication) return isFile; } + /** + * Set the storage policy for an existing file. + * + * @param src file name + * @param policyName storage policy name + */ + void setStoragePolicy(String src, final String policyName) + throws IOException { + try { + setStoragePolicyInt(src, policyName); + } catch (AccessControlException e) { + logAuditEvent(false, "setStoragePolicy", src); + throw e; + } + } + + private void setStoragePolicyInt(String src, final String policyName) + throws IOException { + checkSuperuserPrivilege(); + checkOperation(OperationCategory.WRITE); + byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src); + waitForLoadingFSImage(); + HdfsFileStatus fileStat; + writeLock(); + try { + checkOperation(OperationCategory.WRITE); + checkNameNodeSafeMode("Cannot set storage policy for " + src); + src = FSDirectory.resolvePath(src, pathComponents, dir); + + // get the corresponding policy and make sure the policy name is valid + BlockStoragePolicy policy = blockManager.getStoragePolicy(policyName); + if (policy == null) { + throw new HadoopIllegalArgumentException( + "Cannot find a block policy with the name " + policyName); + } + dir.setStoragePolicy(src, policy.getId()); + getEditLog().logSetStoragePolicy(src, policy.getId()); + fileStat = getAuditFileInfo(src, false); + } finally { + writeUnlock(); + } + + getEditLog().logSync(); + logAuditEvent(true, "setStoragePolicy", src, null, fileStat); + } + long getPreferredBlockSize(String filename) throws IOException, UnresolvedLinkException { FSPermissionChecker pc = getPermissionChecker(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java index fc26881313..ffd6584798 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java @@ -372,6 +372,17 @@ public byte getStoragePolicyID() { return HeaderFormat.getStoragePolicyID(header); } + /** Set the policy id of the file */ + public final void setStoragePolicyID(byte policyId) { + header = HeaderFormat.STORAGE_POLICY_ID.BITS.combine(policyId, header); + } + + public final void setStoragePolicyID(byte policyId, int lastSnapshotId) + throws QuotaExceededException { + recordModification(lastSnapshotId); + setStoragePolicyID(policyId); + } + @Override public long getHeaderLong() { return header; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index 199d728889..d79068bdeb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -579,7 +579,13 @@ public boolean setReplication(String src, short replication) throws IOException { return namesystem.setReplication(src, replication); } - + + @Override + public void setStoragePolicy(String src, String policyName) + throws IOException { + namesystem.setStoragePolicy(src, policyName); + } + @Override // ClientProtocol public void setPermission(String src, FsPermission permissions) throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto index cd291a6860..a019e79124 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto @@ -97,6 +97,14 @@ message SetReplicationResponseProto { required bool result = 1; } +message SetStoragePolicyRequestProto { + required string src = 1; + required string policyName = 2; +} + +message SetStoragePolicyResponseProto { // void response +} + message SetPermissionRequestProto { required string src = 1; required FsPermissionProto permission = 2; @@ -671,6 +679,8 @@ service ClientNamenodeProtocol { rpc append(AppendRequestProto) returns(AppendResponseProto); rpc setReplication(SetReplicationRequestProto) returns(SetReplicationResponseProto); + rpc setStoragePolicy(SetStoragePolicyRequestProto) + returns(SetStoragePolicyResponseProto); rpc setPermission(SetPermissionRequestProto) returns(SetPermissionResponseProto); rpc setOwner(SetOwnerRequestProto) returns(SetOwnerResponseProto); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java index d07cf5f991..9e46df50af 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java @@ -17,12 +17,19 @@ */ package org.apache.hadoop.hdfs; +import java.io.FileNotFoundException; import java.util.EnumSet; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; +import org.apache.hadoop.hdfs.server.namenode.FSDirectory; +import org.apache.hadoop.hdfs.server.namenode.INodeFile; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.test.GenericTestUtils; import org.junit.Assert; import org.junit.Test; @@ -30,8 +37,10 @@ public class TestBlockStoragePolicy { public static final BlockStoragePolicy.Suite POLICY_SUITE; public static final BlockStoragePolicy DEFAULT_STORAGE_POLICY; + public static final Configuration conf; + static { - final Configuration conf = new HdfsConfiguration(); + conf = new HdfsConfiguration(); POLICY_SUITE = BlockStoragePolicy.readBlockStorageSuite(conf); DEFAULT_STORAGE_POLICY = POLICY_SUITE.getDefaultPolicy(); } @@ -41,11 +50,15 @@ public class TestBlockStoragePolicy { static final EnumSet disk = EnumSet.of(StorageType.DISK); static final EnumSet both = EnumSet.of(StorageType.DISK, StorageType.ARCHIVE); + static final long FILE_LEN = 1024; + static final short REPLICATION = 3; + + static final byte COLD = (byte) 4; + static final byte WARM = (byte) 8; + static final byte HOT = (byte) 12; + @Test public void testDefaultPolicies() throws Exception { - final byte COLD = (byte)4; - final byte WARM = (byte)8; - final byte HOT = (byte)12; final Map expectedPolicyStrings = new HashMap(); expectedPolicyStrings.put(COLD, "BlockStoragePolicy{COLD:4, storageTypes=[ARCHIVE], creationFallbacks=[], replicationFallbacks=[]"); @@ -119,4 +132,81 @@ static void assertReplicationFallback(BlockStoragePolicy policy, StorageType non Assert.assertEquals(diskExpected, policy.getReplicationFallback(disk)); Assert.assertEquals(null, policy.getReplicationFallback(both)); } + + @Test + public void testSetStoragePolicy() throws Exception { + final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(REPLICATION).build(); + cluster.waitActive(); + FSDirectory fsdir = cluster.getNamesystem().getFSDirectory(); + final DistributedFileSystem fs = cluster.getFileSystem(); + try { + final Path dir = new Path("/testSetStoragePolicy"); + final Path fooFile = new Path(dir, "foo"); + final Path barDir = new Path(dir, "bar"); + final Path barFile1= new Path(barDir, "f1"); + final Path barFile2= new Path(barDir, "f2"); + DFSTestUtil.createFile(fs, fooFile, FILE_LEN, REPLICATION, 0L); + DFSTestUtil.createFile(fs, barFile1, FILE_LEN, REPLICATION, 0L); + DFSTestUtil.createFile(fs, barFile2, FILE_LEN, REPLICATION, 0L); + + final String invalidPolicyName = "INVALID-POLICY"; + try { + fs.setStoragePolicy(fooFile, invalidPolicyName); + Assert.fail("Should throw a HadoopIllegalArgumentException"); + } catch (RemoteException e) { + GenericTestUtils.assertExceptionContains(invalidPolicyName, e); + } + + // check internal status + INodeFile fooFileNode = fsdir.getINode4Write(fooFile.toString()).asFile(); + INodeFile barFile1Node = fsdir.getINode4Write(barFile1.toString()).asFile(); + INodeFile barFile2Node = fsdir.getINode4Write(barFile2.toString()).asFile(); + + final Path invalidPath = new Path("/invalidPath"); + try { + fs.setStoragePolicy(invalidPath, "WARM"); + Assert.fail("Should throw a FileNotFoundException"); + } catch (FileNotFoundException e) { + GenericTestUtils.assertExceptionContains(invalidPath.toString(), e); + } + + fs.setStoragePolicy(fooFile, "COLD"); + fs.setStoragePolicy(barFile1, "WARM"); + fs.setStoragePolicy(barFile2, "WARM"); + // TODO: set storage policy on a directory + + // check internal status + Assert.assertEquals(COLD, fooFileNode.getStoragePolicyID()); + Assert.assertEquals(WARM, barFile1Node.getStoragePolicyID()); + Assert.assertEquals(WARM, barFile2Node.getStoragePolicyID()); + + // restart namenode to make sure the editlog is correct + cluster.restartNameNode(true); + fsdir = cluster.getNamesystem().getFSDirectory(); + fooFileNode = fsdir.getINode4Write(fooFile.toString()).asFile(); + Assert.assertEquals(COLD, fooFileNode.getStoragePolicyID()); + barFile1Node = fsdir.getINode4Write(barFile1.toString()).asFile(); + Assert.assertEquals(WARM, barFile1Node.getStoragePolicyID()); + barFile2Node = fsdir.getINode4Write(barFile2.toString()).asFile(); + Assert.assertEquals(WARM, barFile2Node.getStoragePolicyID()); + + // restart namenode with checkpoint to make sure the fsimage is correct + fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER); + fs.saveNamespace(); + fs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE); + cluster.restartNameNode(true); + fsdir = cluster.getNamesystem().getFSDirectory(); + fooFileNode = fsdir.getINode4Write(fooFile.toString()).asFile(); + Assert.assertEquals(COLD, fooFileNode.getStoragePolicyID()); + barFile1Node = fsdir.getINode4Write(barFile1.toString()).asFile(); + Assert.assertEquals(WARM, barFile1Node.getStoragePolicyID()); + barFile2Node = fsdir.getINode4Write(barFile2.toString()).asFile(); + Assert.assertEquals(WARM, barFile2Node.getStoragePolicyID()); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } }