diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index d96101bfb7..56280f3a8b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -1211,13 +1211,31 @@ public DFSOutputStream create(String src, FsPermission permission, long blockSize, Progressable progress, int buffersize, ChecksumOpt checksumOpt, InetSocketAddress[] favoredNodes, String ecPolicyName) throws IOException { + return create(src, permission, flag, createParent, replication, blockSize, + progress, buffersize, checksumOpt, favoredNodes, ecPolicyName, null); + } + + /** + * Same as {@link #create(String, FsPermission, EnumSet, boolean, short, long, + * addition of Progressable, int, ChecksumOpt, InetSocketAddress[], String)} + * with the storagePolicy that is used to specify a specific storage policy + * instead of inheriting any policy from this new file's parent directory. + * This policy will be persisted in HDFS. A value of null means inheriting + * parent groups' whatever policy. + */ + public DFSOutputStream create(String src, FsPermission permission, + EnumSet flag, boolean createParent, short replication, + long blockSize, Progressable progress, int buffersize, + ChecksumOpt checksumOpt, InetSocketAddress[] favoredNodes, + String ecPolicyName, String storagePolicy) + throws IOException { checkOpen(); final FsPermission masked = applyUMask(permission); LOG.debug("{}: masked={}", src, masked); final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this, src, masked, flag, createParent, replication, blockSize, progress, dfsClientConf.createChecksum(checksumOpt), - getFavoredNodesStr(favoredNodes), ecPolicyName); + getFavoredNodesStr(favoredNodes), ecPolicyName, storagePolicy); beginFileLease(result.getFileId(), result); return result; } @@ -1271,7 +1289,7 @@ public DFSOutputStream primitiveCreate(String src, FsPermission absPermission, DataChecksum checksum = dfsClientConf.createChecksum(checksumOpt); result = DFSOutputStream.newStreamForCreate(this, src, absPermission, flag, createParent, replication, blockSize, progress, checksum, - null, null); + null, null, null); } beginFileLease(result.getFileId(), result); return result; diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index b8aae97030..aaef8ad909 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -260,7 +260,8 @@ protected DFSOutputStream(DFSClient dfsClient, String src, static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src, FsPermission masked, EnumSet flag, boolean createParent, short replication, long blockSize, Progressable progress, - DataChecksum checksum, String[] favoredNodes, String ecPolicyName) + DataChecksum checksum, String[] favoredNodes, String ecPolicyName, + String storagePolicy) throws IOException { try (TraceScope ignored = dfsClient.newPathTraceScope("newStreamForCreate", src)) { @@ -275,7 +276,8 @@ static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src, try { stat = dfsClient.namenode.create(src, masked, dfsClient.clientName, new EnumSetWritable<>(flag), createParent, replication, - blockSize, SUPPORTED_CRYPTO_VERSIONS, ecPolicyName); + blockSize, SUPPORTED_CRYPTO_VERSIONS, ecPolicyName, + storagePolicy); break; } catch (RemoteException re) { IOException e = re.unwrapRemoteException( diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java index ed37f1dcdf..7956d8eda9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java @@ -563,13 +563,17 @@ public FSDataOutputStream next(final FileSystem fs, final Path p) * replication policy from its ancestor (the default). * ecPolicyName and SHOULD_REPLICATE CreateFlag are mutually exclusive. It's * invalid to set both SHOULD_REPLICATE and a non-null ecPolicyName. + * The third addition is storagePolicyName. A non-null storage Policy + * specifies an explicit storage policy for this file, overriding the + * inherited policy. * */ private HdfsDataOutputStream create(final Path f, final FsPermission permission, final EnumSet flag, final int bufferSize, final short replication, final long blockSize, final Progressable progress, final ChecksumOpt checksumOpt, - final InetSocketAddress[] favoredNodes, final String ecPolicyName) + final InetSocketAddress[] favoredNodes, final String ecPolicyName, + final String storagePolicy) throws IOException { statistics.incrementWriteOps(1); storageStatistics.incrementOpCounter(OpType.CREATE); @@ -579,7 +583,7 @@ private HdfsDataOutputStream create(final Path f, public HdfsDataOutputStream doCall(final Path p) throws IOException { final DFSOutputStream out = dfs.create(getPathName(f), permission, flag, true, replication, blockSize, progress, bufferSize, - checksumOpt, favoredNodes, ecPolicyName); + checksumOpt, favoredNodes, ecPolicyName, storagePolicy); return dfs.createWrappedOutputStream(out, statistics); } @Override @@ -588,7 +592,8 @@ public HdfsDataOutputStream next(final FileSystem fs, final Path p) if (fs instanceof DistributedFileSystem) { DistributedFileSystem myDfs = (DistributedFileSystem)fs; return myDfs.create(p, permission, flag, bufferSize, replication, - blockSize, progress, checksumOpt, favoredNodes, ecPolicyName); + blockSize, progress, checksumOpt, favoredNodes, ecPolicyName, + storagePolicy); } throw new UnsupportedOperationException("Cannot create with" + " favoredNodes through a symlink to a non-DistributedFileSystem: " @@ -619,14 +624,15 @@ protected HdfsDataOutputStream primitiveCreate(Path f, * * @see #create(Path, FsPermission, EnumSet, int, short, long, Progressable, * ChecksumOpt, InetSocketAddress[], String) for the descriptions of - * additional parameters, i.e., favoredNodes and ecPolicyName. + * additional parameters, i.e., favoredNodes, ecPolicyName and + * storagePolicyName. */ private HdfsDataOutputStream createNonRecursive(final Path f, final FsPermission permission, final EnumSet flag, final int bufferSize, final short replication, final long blockSize, final Progressable progress, final ChecksumOpt checksumOpt, - final InetSocketAddress[] favoredNodes, final String ecPolicyName) - throws IOException { + final InetSocketAddress[] favoredNodes, final String ecPolicyName, + final String storagePolicyName) throws IOException { statistics.incrementWriteOps(1); storageStatistics.incrementOpCounter(OpType.CREATE); Path absF = fixRelativePart(f); @@ -635,7 +641,7 @@ private HdfsDataOutputStream createNonRecursive(final Path f, public HdfsDataOutputStream doCall(final Path p) throws IOException { final DFSOutputStream out = dfs.create(getPathName(f), permission, flag, false, replication, blockSize, progress, bufferSize, - checksumOpt, favoredNodes, ecPolicyName); + checksumOpt, favoredNodes, ecPolicyName, storagePolicyName); return dfs.createWrappedOutputStream(out, statistics); } @Override @@ -645,7 +651,7 @@ public HdfsDataOutputStream next(final FileSystem fs, final Path p) DistributedFileSystem myDfs = (DistributedFileSystem)fs; return myDfs.createNonRecursive(p, permission, flag, bufferSize, replication, blockSize, progress, checksumOpt, favoredNodes, - ecPolicyName); + ecPolicyName, storagePolicyName); } throw new UnsupportedOperationException("Cannot create with" + " favoredNodes through a symlink to a non-DistributedFileSystem: " @@ -3183,6 +3189,7 @@ public static final class HdfsDataOutputStreamBuilder private final DistributedFileSystem dfs; private InetSocketAddress[] favoredNodes = null; private String ecPolicyName = null; + private String storagePolicyName = null; /** * Construct a HdfsDataOutputStream builder for a file. @@ -3254,6 +3261,22 @@ public HdfsDataOutputStreamBuilder noLocalWrite() { return this; } + @VisibleForTesting + String getStoragePolicyName() { + return storagePolicyName; + } + + /** + * Enforce a file to follow the specified storage policy irrespective of the + * storage policy of its parent directory. + */ + public HdfsDataOutputStreamBuilder storagePolicyName( + @Nonnull final String policyName) { + Preconditions.checkNotNull(policyName); + storagePolicyName = policyName; + return this; + } + @VisibleForTesting String getEcPolicyName() { return ecPolicyName; @@ -3320,11 +3343,12 @@ public FSDataOutputStream build() throws IOException { return dfs.create(getPath(), getPermission(), getFlags(), getBufferSize(), getReplication(), getBlockSize(), getProgress(), getChecksumOpt(), getFavoredNodes(), - getEcPolicyName()); + getEcPolicyName(), getStoragePolicyName()); } else { return dfs.createNonRecursive(getPath(), getPermission(), getFlags(), getBufferSize(), getReplication(), getBlockSize(), getProgress(), - getChecksumOpt(), getFavoredNodes(), getEcPolicyName()); + getChecksumOpt(), getFavoredNodes(), getEcPolicyName(), + getStoragePolicyName()); } } else if (getFlags().contains(CreateFlag.APPEND)) { return dfs.append(getPath(), getFlags(), getBufferSize(), getProgress(), diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java index 953e48a932..da93707551 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java @@ -176,6 +176,7 @@ LocatedBlocks getBlockLocations(String src, long offset, long length) * policy. ecPolicyName and SHOULD_REPLICATE CreateFlag * are mutually exclusive. It's invalid to set both * SHOULD_REPLICATE flag and a non-null ecPolicyName. + *@param storagePolicy the name of the storage policy. * * @return the status of the created file, it could be null if the server * doesn't support returning the file status @@ -209,7 +210,8 @@ LocatedBlocks getBlockLocations(String src, long offset, long length) HdfsFileStatus create(String src, FsPermission masked, String clientName, EnumSetWritable flag, boolean createParent, short replication, long blockSize, - CryptoProtocolVersion[] supportedVersions, String ecPolicyName) + CryptoProtocolVersion[] supportedVersions, String ecPolicyName, + String storagePolicy) throws IOException; /** diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java index 65ebc2cc89..a23ae48de3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java @@ -345,7 +345,8 @@ public FsServerDefaults getServerDefaults() throws IOException { public HdfsFileStatus create(String src, FsPermission masked, String clientName, EnumSetWritable flag, boolean createParent, short replication, long blockSize, - CryptoProtocolVersion[] supportedVersions, String ecPolicyName) + CryptoProtocolVersion[] supportedVersions, String ecPolicyName, + String storagePolicy) throws IOException { CreateRequestProto.Builder builder = CreateRequestProto.newBuilder() .setSrc(src) @@ -358,6 +359,9 @@ public HdfsFileStatus create(String src, FsPermission masked, if (ecPolicyName != null) { builder.setEcPolicyName(ecPolicyName); } + if (storagePolicy != null) { + builder.setStoragePolicy(storagePolicy); + } FsPermission unmasked = masked.getUnmasked(); if (unmasked != null) { builder.setUnmasked(PBHelperClient.convert(unmasked)); diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto index 7343997be2..d08ad9b4f8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto @@ -83,6 +83,7 @@ message CreateRequestProto { repeated CryptoProtocolVersionProto cryptoProtocolVersion = 8; optional FsPermissionProto unmasked = 9; optional string ecPolicyName = 10; + optional string storagePolicy = 11; } message CreateResponseProto { diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java index 3c8465b7da..344401f4f3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java @@ -192,7 +192,8 @@ public FsServerDefaults getServerDefaults() throws IOException { public HdfsFileStatus create(String src, FsPermission masked, String clientName, EnumSetWritable flag, boolean createParent, short replication, long blockSize, - CryptoProtocolVersion[] supportedVersions, String ecPolicyName) + CryptoProtocolVersion[] supportedVersions, String ecPolicyName, + String storagePolicy) throws IOException { rpcServer.checkOperation(NameNode.OperationCategory.WRITE); @@ -213,9 +214,9 @@ public HdfsFileStatus create(String src, FsPermission masked, new Class[] {String.class, FsPermission.class, String.class, EnumSetWritable.class, boolean.class, short.class, long.class, CryptoProtocolVersion[].class, - String.class}, + String.class, String.class}, createLocation.getDest(), masked, clientName, flag, createParent, - replication, blockSize, supportedVersions, ecPolicyName); + replication, blockSize, supportedVersions, ecPolicyName, storagePolicy); return (HdfsFileStatus) rpcClient.invokeSingle(createLocation, method); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index 4e2bb82f07..36d3c81e16 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -500,10 +500,11 @@ public FsServerDefaults getServerDefaults() throws IOException { public HdfsFileStatus create(String src, FsPermission masked, String clientName, EnumSetWritable flag, boolean createParent, short replication, long blockSize, - CryptoProtocolVersion[] supportedVersions, String ecPolicyName) + CryptoProtocolVersion[] supportedVersions, String ecPolicyName, + String storagePolicy) throws IOException { return clientProto.create(src, masked, clientName, flag, createParent, - replication, blockSize, supportedVersions, ecPolicyName); + replication, blockSize, supportedVersions, ecPolicyName, storagePolicy); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java index a2a7b189a6..a32cba147d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java @@ -884,7 +884,7 @@ public void testProxyGetAdditionalDatanode() HdfsFileStatus status = routerProtocol.create( newRouterFile, new FsPermission("777"), clientName, new EnumSetWritable(createFlag), true, (short) 1, - (long) 1024, CryptoProtocolVersion.supported(), null); + (long) 1024, CryptoProtocolVersion.supported(), null, null); // Add a block via router (requires client to have same lease) LocatedBlock block = routerProtocol.addBlock( diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java index e4a2f0b655..6673baa58e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java @@ -478,7 +478,7 @@ public CreateResponseProto create(RpcController controller, (short) req.getReplication(), req.getBlockSize(), PBHelperClient.convertCryptoProtocolVersions( req.getCryptoProtocolVersionList()), - req.getEcPolicyName()); + req.getEcPolicyName(), req.getStoragePolicy()); if (result != null) { return CreateResponseProto.newBuilder().setFs(PBHelperClient.convert(result)) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java index 2875708b72..9b0a64d75a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java @@ -361,7 +361,8 @@ static HdfsFileStatus startFile( EnumSet flag, boolean createParent, short replication, long blockSize, FileEncryptionInfo feInfo, INode.BlocksMapUpdateInfo toRemoveBlocks, - boolean shouldReplicate, String ecPolicyName, boolean logRetryEntry) + boolean shouldReplicate, String ecPolicyName, String storagePolicy, + boolean logRetryEntry) throws IOException { assert fsn.hasWriteLock(); boolean overwrite = flag.contains(CreateFlag.OVERWRITE); @@ -396,7 +397,7 @@ static HdfsFileStatus startFile( if (parent != null) { iip = addFile(fsd, parent, iip.getLastLocalName(), permissions, replication, blockSize, holder, clientMachine, shouldReplicate, - ecPolicyName); + ecPolicyName, storagePolicy); newNode = iip != null ? iip.getLastINode().asFile() : null; } if (newNode == null) { @@ -540,7 +541,7 @@ private static INodesInPath addFile( FSDirectory fsd, INodesInPath existing, byte[] localName, PermissionStatus permissions, short replication, long preferredBlockSize, String clientName, String clientMachine, boolean shouldReplicate, - String ecPolicyName) throws IOException { + String ecPolicyName, String storagePolicy) throws IOException { Preconditions.checkNotNull(existing); long modTime = now(); @@ -549,6 +550,16 @@ private static INodesInPath addFile( try { boolean isStriped = false; ErasureCodingPolicy ecPolicy = null; + byte storagepolicyid = 0; + if (storagePolicy != null && !storagePolicy.isEmpty()) { + BlockStoragePolicy policy = + fsd.getBlockManager().getStoragePolicy(storagePolicy); + if (policy == null) { + throw new HadoopIllegalArgumentException( + "Cannot find a block policy with the name " + storagePolicy); + } + storagepolicyid = policy.getId(); + } if (!shouldReplicate) { ecPolicy = FSDirErasureCodingOp.getErasureCodingPolicy( fsd.getFSNamesystem(), ecPolicyName, existing); @@ -562,7 +573,7 @@ private static INodesInPath addFile( final Byte ecPolicyID = (isStriped ? ecPolicy.getId() : null); INodeFile newNode = newINodeFile(fsd.allocateNewInodeId(), permissions, modTime, modTime, replicationFactor, ecPolicyID, preferredBlockSize, - blockType); + storagepolicyid, blockType); newNode.setLocalName(localName); newNode.toUnderConstruction(clientName, clientMachine); newiip = fsd.addINode(existing, newNode, permissions.getPermission()); @@ -740,13 +751,6 @@ private static INodeFile newINodeFile( storagePolicyId, blockType); } - private static INodeFile newINodeFile(long id, PermissionStatus permissions, - long mtime, long atime, Short replication, Byte ecPolicyID, - long preferredBlockSize, BlockType blockType) { - return newINodeFile(id, permissions, mtime, atime, replication, ecPolicyID, - preferredBlockSize, (byte)0, blockType); - } - /** * Persist the new block (the last block of the given file). */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index ea7db4d04b..8659ea4f7b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -2441,13 +2441,13 @@ HdfsFileStatus startFile(String src, PermissionStatus permissions, String holder, String clientMachine, EnumSet flag, boolean createParent, short replication, long blockSize, CryptoProtocolVersion[] supportedVersions, String ecPolicyName, - boolean logRetryCache) throws IOException { + String storagePolicy, boolean logRetryCache) throws IOException { HdfsFileStatus status; try { status = startFileInt(src, permissions, holder, clientMachine, flag, createParent, replication, blockSize, supportedVersions, ecPolicyName, - logRetryCache); + storagePolicy, logRetryCache); } catch (AccessControlException e) { logAuditEvent(false, "create", src); throw e; @@ -2460,7 +2460,8 @@ private HdfsFileStatus startFileInt(String src, PermissionStatus permissions, String holder, String clientMachine, EnumSet flag, boolean createParent, short replication, long blockSize, CryptoProtocolVersion[] supportedVersions, - String ecPolicyName, boolean logRetryCache) throws IOException { + String ecPolicyName, String storagePolicy, boolean logRetryCache) + throws IOException { if (NameNode.stateChangeLog.isDebugEnabled()) { StringBuilder builder = new StringBuilder(); builder.append("DIR* NameSystem.startFile: src=").append(src) @@ -2549,7 +2550,8 @@ private HdfsFileStatus startFileInt(String src, try { stat = FSDirWriteFileOp.startFile(this, iip, permissions, holder, clientMachine, flag, createParent, replication, blockSize, feInfo, - toRemoveBlocks, shouldReplicate, ecPolicyName, logRetryCache); + toRemoveBlocks, shouldReplicate, ecPolicyName, storagePolicy, + logRetryCache); } catch (IOException e) { skipSync = e instanceof StandbyException; throw e; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index 0da8099f10..f50648d9b6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -780,7 +780,8 @@ public FsServerDefaults getServerDefaults() throws IOException { public HdfsFileStatus create(String src, FsPermission masked, String clientName, EnumSetWritable flag, boolean createParent, short replication, long blockSize, - CryptoProtocolVersion[] supportedVersions, String ecPolicyName) + CryptoProtocolVersion[] supportedVersions, String ecPolicyName, + String storagePolicy) throws IOException { checkNNStartup(); String clientMachine = getClientMachine(); @@ -804,7 +805,7 @@ public HdfsFileStatus create(String src, FsPermission masked, .getShortUserName(), null, masked); status = namesystem.startFile(src, perm, clientName, clientMachine, flag.get(), createParent, replication, blockSize, supportedVersions, - ecPolicyName, cacheEntry != null); + ecPolicyName, storagePolicy, cacheEntry != null); } finally { RetryCache.setState(cacheEntry, status != null, status); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java index e3cab7a38d..8886eee74b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java @@ -2137,7 +2137,7 @@ public static void createStripedFile(MiniDFSCluster cluster, Path file, .create(file.toString(), new FsPermission((short)0755), dfs.getClient().getClientName(), new EnumSetWritable<>(EnumSet.of(CreateFlag.CREATE)), - false, (short)1, 128*1024*1024L, null, null); + false, (short) 1, 128 * 1024 * 1024L, null, null, null); FSNamesystem ns = cluster.getNamesystem(); FSDirectory fsdir = ns.getFSDirectory(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java index 6dfd86c68b..a27bfc1003 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java @@ -276,7 +276,7 @@ public Object answer(InvocationOnMock invocation) .build()) .when(mockNN) .create(anyString(), any(), anyString(), any(), anyBoolean(), - anyShort(), anyLong(), any(), any()); + anyShort(), anyLong(), any(), any(), any()); final DFSClient client = new DFSClient(null, mockNN, conf, null); OutputStream os = client.create("testfile", true); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java index 629abd7d9d..60ff61406d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java @@ -1449,7 +1449,46 @@ public void testFileCloseStatus() throws IOException { cluster.shutdown(); } } - + + @Test + public void testCreateWithStoragePolicy() throws Throwable { + Configuration conf = new HdfsConfiguration(); + try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .storageTypes( + new StorageType[] {StorageType.DISK, StorageType.ARCHIVE, + StorageType.SSD}).storagesPerDatanode(3).build()) { + DistributedFileSystem fs = cluster.getFileSystem(); + Path file1 = new Path("/tmp/file1"); + Path file2 = new Path("/tmp/file2"); + fs.mkdirs(new Path("/tmp")); + fs.setStoragePolicy(new Path("/tmp"), "ALL_SSD"); + FSDataOutputStream outputStream = fs.createFile(file1) + .storagePolicyName("COLD").build(); + outputStream.write(1); + outputStream.close(); + assertEquals(StorageType.ARCHIVE, DFSTestUtil.getAllBlocks(fs, file1) + .get(0).getStorageTypes()[0]); + assertEquals(fs.getStoragePolicy(file1).getName(), "COLD"); + + // Check with storage policy not specified. + outputStream = fs.createFile(file2).build(); + outputStream.write(1); + outputStream.close(); + assertEquals(StorageType.SSD, DFSTestUtil.getAllBlocks(fs, file2).get(0) + .getStorageTypes()[0]); + assertEquals(fs.getStoragePolicy(file2).getName(), "ALL_SSD"); + + // Check with default storage policy. + outputStream = fs.createFile(new Path("/default")).build(); + outputStream.write(1); + outputStream.close(); + assertEquals(StorageType.DISK, + DFSTestUtil.getAllBlocks(fs, new Path("/default")).get(0) + .getStorageTypes()[0]); + assertEquals(fs.getStoragePolicy(new Path("/default")).getName(), "HOT"); + } + } + @Test(timeout=60000) public void testListFiles() throws IOException { Configuration conf = new HdfsConfiguration(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZones.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZones.java index 7ad25b7d0c..d401380ceb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZones.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZones.java @@ -963,7 +963,7 @@ private static void mockCreate(ClientProtocol mcp, .build()) .when(mcp) .create(anyString(), any(), anyString(), any(), anyBoolean(), - anyShort(), anyLong(), any(), any()); + anyShort(), anyLong(), any(), any(), any()); } // This test only uses mocks. Called from the end of an existing test to diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java index 89aa9ba089..93687b680a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java @@ -1246,7 +1246,7 @@ private void doCreateTest(CreationMethod method) throws Exception { try { nnrpc.create(pathStr, new FsPermission((short)0755), "client", new EnumSetWritable(EnumSet.of(CreateFlag.CREATE)), - true, (short)1, 128*1024*1024L, null, null); + true, (short) 1, 128 * 1024 * 1024L, null, null, null); fail("Should have thrown exception when creating '" + pathStr + "'" + " by " + method); } catch (InvalidPathException ipe) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java index 19a4bb6d59..909a072d34 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java @@ -372,7 +372,7 @@ public void testFactory() throws Exception { .build()) .when(mcp) .create(anyString(), any(), anyString(), - any(), anyBoolean(), anyShort(), anyLong(), any(), any()); + any(), anyBoolean(), anyShort(), anyLong(), any(), any(), any()); final Configuration conf = new Configuration(); final DFSClient c1 = createDFSClientAs(ugi[0], conf); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java index 654a8a5c8e..e5d9826c3e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java @@ -593,7 +593,8 @@ long executeOp(int daemonId, int inputIdx, String clientName) FsPermission.getDefault(), clientName, new EnumSetWritable(EnumSet .of(CreateFlag.CREATE, CreateFlag.OVERWRITE)), true, - replication, BLOCK_SIZE, CryptoProtocolVersion.supported(), null); + replication, BLOCK_SIZE, CryptoProtocolVersion.supported(), null, + null); long end = Time.now(); for (boolean written = !closeUponCreate; !written; written = clientProto.complete(fileNames[daemonId][inputIdx], @@ -1143,7 +1144,7 @@ void generateInputs(int[] ignore) throws IOException { String fileName = nameGenerator.getNextFileName("ThroughputBench"); clientProto.create(fileName, FsPermission.getDefault(), clientName, new EnumSetWritable(EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)), true, replication, - BLOCK_SIZE, CryptoProtocolVersion.supported(), null); + BLOCK_SIZE, CryptoProtocolVersion.supported(), null, null); ExtendedBlock lastBlock = addBlocks(fileName, clientName); clientProto.complete(fileName, clientName, lastBlock, HdfsConstants.GRANDFATHER_INODE_ID); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java index 13cd16f71a..088a47e893 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java @@ -86,7 +86,7 @@ public void testRetryAddBlockWhileInChooseTarget() throws Exception { nn.create(src, FsPermission.getFileDefault(), "clientName", new EnumSetWritable(EnumSet.of(CreateFlag.CREATE)), - true, (short)3, 1024, null, null); + true, (short) 3, 1024, null, null, null); // start first addBlock() LOG.info("Starting first addBlock for " + src); @@ -158,7 +158,7 @@ public void testAddBlockRetryShouldReturnBlockWithLocations() // create file nameNodeRpc.create(src, FsPermission.getFileDefault(), "clientName", new EnumSetWritable(EnumSet.of(CreateFlag.CREATE)), true, - (short) 3, 1024, null, null); + (short) 3, 1024, null, null, null); // start first addBlock() LOG.info("Starting first addBlock for " + src); LocatedBlock lb1 = nameNodeRpc.addBlock(src, "clientName", null, null, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockPlacementPolicyRackFaultTolerant.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockPlacementPolicyRackFaultTolerant.java index 7cef64be22..f1e59f5c65 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockPlacementPolicyRackFaultTolerant.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockPlacementPolicyRackFaultTolerant.java @@ -109,7 +109,7 @@ private void doTestChooseTargetNormalCase() throws Exception { // Create the file with client machine HdfsFileStatus fileStatus = namesystem.startFile(src, perm, clientMachine, clientMachine, EnumSet.of(CreateFlag.CREATE), true, - replication, DEFAULT_BLOCK_SIZE, null, null, false); + replication, DEFAULT_BLOCK_SIZE, null, null, null, false); //test chooseTarget for new file LocatedBlock locatedBlock = nameNodeRpc.addBlock(src, clientMachine, @@ -139,7 +139,7 @@ private void doTestChooseTargetSpecialCase() throws Exception { // Create the file with client machine HdfsFileStatus fileStatus = namesystem.startFile(src, perm, clientMachine, clientMachine, EnumSet.of(CreateFlag.CREATE), true, - (short) 20, DEFAULT_BLOCK_SIZE, null, null, false); + (short) 20, DEFAULT_BLOCK_SIZE, null, null, null, false); //test chooseTarget for new file LocatedBlock locatedBlock = nameNodeRpc.addBlock(src, clientMachine, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDefaultBlockPlacementPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDefaultBlockPlacementPolicy.java index 205593f597..c93339bd84 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDefaultBlockPlacementPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDefaultBlockPlacementPolicy.java @@ -131,7 +131,7 @@ public void testPlacementWithDFSNetworkTopology() throws Exception { // Create the file with client machine HdfsFileStatus fileStatus = namesystem.startFile(src, perm, clientMachine, clientMachine, EnumSet.of(CreateFlag.CREATE), true, REPLICATION_FACTOR, - DEFAULT_BLOCK_SIZE, null, null, false); + DEFAULT_BLOCK_SIZE, null, null, null, false); LocatedBlock locatedBlock = nameNodeRpc.addBlock(src, clientMachine, null, null, fileStatus.getFileId(), null, null); @@ -184,7 +184,7 @@ private void testPlacement(String clientMachine, // Create the file with client machine HdfsFileStatus fileStatus = namesystem.startFile(src, perm, clientMachine, clientMachine, EnumSet.of(CreateFlag.CREATE), true, - REPLICATION_FACTOR, DEFAULT_BLOCK_SIZE, null, null, false); + REPLICATION_FACTOR, DEFAULT_BLOCK_SIZE, null, null, null, false); LocatedBlock locatedBlock = nameNodeRpc.addBlock(src, clientMachine, null, null, fileStatus.getFileId(), null, null); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java index 0995f135d9..7f6f399023 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java @@ -232,19 +232,19 @@ public void testCreate() throws Exception { newCall(); HdfsFileStatus status = nnRpc.create(src, perm, "holder", new EnumSetWritable(EnumSet.of(CreateFlag.CREATE)), true, - (short) 1, BlockSize, null, null); + (short) 1, BlockSize, null, null, null); Assert.assertEquals(status, nnRpc.create(src, perm, "holder", new EnumSetWritable(EnumSet.of(CreateFlag.CREATE)), true, - (short) 1, BlockSize, null, null)); + (short) 1, BlockSize, null, null, null)); Assert.assertEquals(status, nnRpc.create(src, perm, "holder", new EnumSetWritable(EnumSet.of(CreateFlag.CREATE)), true, - (short) 1, BlockSize, null, null)); + (short) 1, BlockSize, null, null, null)); // A non-retried call fails newCall(); try { nnRpc.create(src, perm, "holder", new EnumSetWritable(EnumSet.of(CreateFlag.CREATE)), - true, (short) 1, BlockSize, null, null); + true, (short) 1, BlockSize, null, null, null); Assert.fail("testCreate - expected exception is not thrown"); } catch (IOException e) { // expected diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java index 3014778993..eac3659710 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java @@ -414,7 +414,7 @@ void invoke() throws Exception { new EnumSetWritable(createFlag), false, DataNodes, BlockSize, new CryptoProtocolVersion[] {CryptoProtocolVersion.ENCRYPTION_ZONES}, - null); + null, null); } @Override