From a7312715a66dec5173c3a0a78dff4e0333e7f0b1 Mon Sep 17 00:00:00 2001 From: Andrew Wang Date: Wed, 12 Apr 2017 12:27:34 -0700 Subject: [PATCH] HDFS-10996. Ability to specify per-file EC policy at create time. Contributed by SammiChen. --- .../org/apache/hadoop/hdfs/DFSClient.java | 23 +++++++- .../apache/hadoop/hdfs/DFSOutputStream.java | 5 +- .../hadoop/hdfs/DistributedFileSystem.java | 34 +++++++++--- .../hadoop/hdfs/protocol/ClientProtocol.java | 6 ++- .../ClientNamenodeProtocolTranslatorPB.java | 5 +- .../main/proto/ClientNamenodeProtocol.proto | 1 + ...amenodeProtocolServerSideTranslatorPB.java | 3 +- .../server/namenode/FSDirErasureCodingOp.java | 54 ++++++++++++------- .../server/namenode/FSDirWriteFileOp.java | 17 ++++-- .../hdfs/server/namenode/FSNamesystem.java | 16 +++--- .../server/namenode/NameNodeRpcServer.java | 6 +-- .../org/apache/hadoop/hdfs/DFSTestUtil.java | 2 +- .../hadoop/hdfs/TestDFSClientRetries.java | 3 +- .../hadoop/hdfs/TestEncryptionZones.java | 3 +- .../hdfs/TestErasureCodingPolicies.java | 45 ++++++++++++++++ .../apache/hadoop/hdfs/TestFileCreation.java | 2 +- .../org/apache/hadoop/hdfs/TestLease.java | 3 +- .../namenode/NNThroughputBenchmark.java | 16 +++--- .../server/namenode/TestAddBlockRetry.java | 4 +- ...BlockPlacementPolicyRackFaultTolerant.java | 4 +- .../TestDefaultBlockPlacementPolicy.java | 2 +- .../namenode/TestNamenodeRetryCache.java | 17 +++--- .../namenode/ha/TestRetryCacheWithHA.java | 3 +- 23 files changed, 200 insertions(+), 74 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 5bc38e8e72..ef499505ec 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -1190,13 +1190,31 @@ public DFSOutputStream create(String src, FsPermission permission, long blockSize, Progressable progress, int buffersize, ChecksumOpt checksumOpt, InetSocketAddress[] favoredNodes) throws IOException { + return create(src, permission, flag, createParent, replication, blockSize, + progress, buffersize, checksumOpt, favoredNodes, null); + } + + + /** + * Same as {@link #create(String, FsPermission, EnumSet, boolean, short, long, + * Progressable, int, ChecksumOpt, InetSocketAddress[])} with the addition of + * ecPolicyName that is used to specify a specific erasure coding policy + * instead of inheriting any policy from this new file's parent directory. + * This policy will be persisted in HDFS. A value of null means inheriting + * parent groups' whatever policy. + */ + public DFSOutputStream create(String src, FsPermission permission, + EnumSet flag, boolean createParent, short replication, + long blockSize, Progressable progress, int buffersize, + ChecksumOpt checksumOpt, InetSocketAddress[] favoredNodes, + String ecPolicyName) throws IOException { checkOpen(); final FsPermission masked = applyUMask(permission); LOG.debug("{}: masked={}", src, masked); final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this, src, masked, flag, createParent, replication, blockSize, progress, dfsClientConf.createChecksum(checksumOpt), - getFavoredNodesStr(favoredNodes)); + getFavoredNodesStr(favoredNodes), ecPolicyName); beginFileLease(result.getFileId(), result); return result; } @@ -1249,7 +1267,8 @@ public DFSOutputStream primitiveCreate(String src, FsPermission absPermission, if (result == null) { DataChecksum checksum = dfsClientConf.createChecksum(checksumOpt); result = DFSOutputStream.newStreamForCreate(this, src, absPermission, - flag, createParent, replication, blockSize, progress, checksum, null); + flag, createParent, replication, blockSize, progress, checksum, + null, null); } beginFileLease(result.getFileId(), result); return result; diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index 9a52fbefa0..ceaefd80b1 100755 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -255,7 +255,8 @@ protected DFSOutputStream(DFSClient dfsClient, String src, static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src, FsPermission masked, EnumSet flag, boolean createParent, short replication, long blockSize, Progressable progress, - DataChecksum checksum, String[] favoredNodes) throws IOException { + DataChecksum checksum, String[] favoredNodes, String ecPolicyName) + throws IOException { try (TraceScope ignored = dfsClient.newPathTraceScope("newStreamForCreate", src)) { HdfsFileStatus stat = null; @@ -269,7 +270,7 @@ static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src, try { stat = dfsClient.namenode.create(src, masked, dfsClient.clientName, new EnumSetWritable<>(flag), createParent, replication, - blockSize, SUPPORTED_CRYPTO_VERSIONS); + blockSize, SUPPORTED_CRYPTO_VERSIONS, ecPolicyName); break; } catch (RemoteException re) { IOException e = re.unwrapRemoteException( diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java index 2336fab756..11d7eb8977 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java @@ -100,6 +100,8 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import javax.annotation.Nonnull; + /**************************************************************** * Implementation of the abstract FileSystem for the DFS system. * This object is the way end-user code interacts with a Hadoop @@ -456,13 +458,18 @@ public FSDataOutputStream next(final FileSystem fs, final Path p) * at the creation time only. And with favored nodes, blocks will be pinned * on the datanodes to prevent balancing move the block. HDFS could move the * blocks during replication, to move the blocks from favored nodes. A value - * of null means no favored nodes for this create + * of null means no favored nodes for this create. + * Another addition is ecPolicyName. A non-null ecPolicyName specifies an + * explicit erasure coding policy for this file, overriding the inherited + * policy. A null ecPolicyName means the file will inherit its EC policy from + * an ancestor (the default). */ private HdfsDataOutputStream create(final Path f, - final FsPermission permission, EnumSet flag, + final FsPermission permission, final EnumSet flag, final int bufferSize, final short replication, final long blockSize, final Progressable progress, final ChecksumOpt checksumOpt, - final InetSocketAddress[] favoredNodes) throws IOException { + final InetSocketAddress[] favoredNodes, final String ecPolicyName) + throws IOException { statistics.incrementWriteOps(1); storageStatistics.incrementOpCounter(OpType.CREATE); Path absF = fixRelativePart(f); @@ -471,7 +478,7 @@ private HdfsDataOutputStream create(final Path f, public HdfsDataOutputStream doCall(final Path p) throws IOException { final DFSOutputStream out = dfs.create(getPathName(f), permission, flag, true, replication, blockSize, progress, bufferSize, - checksumOpt, favoredNodes); + checksumOpt, favoredNodes, ecPolicyName); return dfs.createWrappedOutputStream(out, statistics); } @Override @@ -480,7 +487,7 @@ public HdfsDataOutputStream next(final FileSystem fs, final Path p) if (fs instanceof DistributedFileSystem) { DistributedFileSystem myDfs = (DistributedFileSystem)fs; return myDfs.create(p, permission, flag, bufferSize, replication, - blockSize, progress, checksumOpt, favoredNodes); + blockSize, progress, checksumOpt, favoredNodes, ecPolicyName); } throw new UnsupportedOperationException("Cannot create with" + " favoredNodes through a symlink to a non-DistributedFileSystem: " @@ -2645,6 +2652,7 @@ public static class HdfsDataOutputStreamBuilder extends FSDataOutputStreamBuilder { private final DistributedFileSystem dfs; private InetSocketAddress[] favoredNodes = null; + private String ecPolicyName = null; public HdfsDataOutputStreamBuilder(DistributedFileSystem dfs, Path path) { super(dfs, path); @@ -2656,17 +2664,29 @@ protected InetSocketAddress[] getFavoredNodes() { } public HdfsDataOutputStreamBuilder setFavoredNodes( - final InetSocketAddress[] nodes) { + @Nonnull final InetSocketAddress[] nodes) { Preconditions.checkNotNull(nodes); favoredNodes = nodes.clone(); return this; } + protected String getEcPolicyName() { + return ecPolicyName; + } + + public HdfsDataOutputStreamBuilder setEcPolicyName( + @Nonnull final String policyName) { + Preconditions.checkNotNull(policyName); + ecPolicyName = policyName; + return this; + } + @Override public HdfsDataOutputStream build() throws IOException { return dfs.create(getPath(), getPermission(), getFlags(), getBufferSize(), getReplication(), getBlockSize(), - getProgress(), getChecksumOpt(), getFavoredNodes()); + getProgress(), getChecksumOpt(), getFavoredNodes(), + getEcPolicyName()); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java index 041d226840..6db37b8f05 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java @@ -160,6 +160,10 @@ LocatedBlocks getBlockLocations(String src, long offset, long length) * @param replication block replication factor. * @param blockSize maximum block size. * @param supportedVersions CryptoProtocolVersions supported by the client + * @param ecPolicyName the name of erasure coding policy. A null value means + * this file will inherit its parent directory's policy, + * either traditional replication or erasure coding + * policy. * * @return the status of the created file, it could be null if the server * doesn't support returning the file status @@ -193,7 +197,7 @@ LocatedBlocks getBlockLocations(String src, long offset, long length) HdfsFileStatus create(String src, FsPermission masked, String clientName, EnumSetWritable flag, boolean createParent, short replication, long blockSize, - CryptoProtocolVersion[] supportedVersions) + CryptoProtocolVersion[] supportedVersions, String ecPolicyName) throws IOException; /** diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java index da4a17f579..c3708f9878 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java @@ -285,7 +285,7 @@ public FsServerDefaults getServerDefaults() throws IOException { public HdfsFileStatus create(String src, FsPermission masked, String clientName, EnumSetWritable flag, boolean createParent, short replication, long blockSize, - CryptoProtocolVersion[] supportedVersions) + CryptoProtocolVersion[] supportedVersions, String ecPolicyName) throws IOException { CreateRequestProto.Builder builder = CreateRequestProto.newBuilder() .setSrc(src) @@ -295,6 +295,9 @@ public HdfsFileStatus create(String src, FsPermission masked, .setCreateParent(createParent) .setReplication(replication) .setBlockSize(blockSize); + if (ecPolicyName != null) { + builder.setEcPolicyName(ecPolicyName); + } FsPermission unmasked = masked.getUnmasked(); if (unmasked != null) { builder.setUnmasked(PBHelperClient.convert(unmasked)); diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto index ff4db03590..44f1c3373b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto @@ -80,6 +80,7 @@ message CreateRequestProto { required uint64 blockSize = 7; repeated CryptoProtocolVersionProto cryptoProtocolVersion = 8; optional FsPermissionProto unmasked = 9; + optional string ecPolicyName = 10; } message CreateResponseProto { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java index 1944fe7a28..ab0ccdb1ea 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java @@ -424,7 +424,8 @@ public CreateResponseProto create(RpcController controller, PBHelperClient.convertCreateFlag(req.getCreateFlag()), req.getCreateParent(), (short) req.getReplication(), req.getBlockSize(), PBHelperClient.convertCryptoProtocolVersions( - req.getCryptoProtocolVersionList())); + req.getCryptoProtocolVersionList()), + req.getEcPolicyName()); if (result != null) { return CreateResponseProto.newBuilder().setFs(PBHelperClient.convert(result)) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirErasureCodingOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirErasureCodingOp.java index aa9772d582..763b935b30 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirErasureCodingOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirErasureCodingOp.java @@ -58,6 +58,39 @@ final class FSDirErasureCodingOp { */ private FSDirErasureCodingOp() {} + /** + * Check if the ecPolicyName is valid and enabled, return the corresponding + * EC policy if is. + * @param fsn namespace + * @param ecPolicyName name of EC policy to be checked + * @return an erasure coding policy if ecPolicyName is valid and enabled + * @throws IOException + */ + static ErasureCodingPolicy getErasureCodingPolicyByName( + final FSNamesystem fsn, final String ecPolicyName) throws IOException { + assert fsn.hasReadLock(); + ErasureCodingPolicy ecPolicy = fsn.getErasureCodingPolicyManager() + .getEnabledPolicyByName(ecPolicyName); + if (ecPolicy == null) { + final String sysPolicies = + Arrays.asList( + fsn.getErasureCodingPolicyManager().getEnabledPolicies()) + .stream() + .map(ErasureCodingPolicy::getName) + .collect(Collectors.joining(", ")); + final String message = String.format("Policy '%s' does not match any " + + "enabled erasure" + + " coding policies: [%s]. The set of enabled erasure coding " + + "policies can be configured at '%s'.", + ecPolicyName, + sysPolicies, + DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_ENABLED_KEY + ); + throw new HadoopIllegalArgumentException(message); + } + return ecPolicy; + } + /** * Set an erasure coding policy on the given path. * @@ -84,25 +117,8 @@ static HdfsFileStatus setErasureCodingPolicy(final FSNamesystem fsn, List xAttrs; fsd.writeLock(); try { - ErasureCodingPolicy ecPolicy = fsn.getErasureCodingPolicyManager() - .getEnabledPolicyByName(ecPolicyName); - if (ecPolicy == null) { - final String sysPolicies = - Arrays.asList( - fsn.getErasureCodingPolicyManager().getEnabledPolicies()) - .stream() - .map(ErasureCodingPolicy::getName) - .collect(Collectors.joining(", ")); - final String message = String.format("Policy '%s' does not match any " + - "enabled erasure" + - " coding policies: [%s]. The set of enabled erasure coding " + - "policies can be configured at '%s'.", - ecPolicyName, - sysPolicies, - DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_ENABLED_KEY - ); - throw new HadoopIllegalArgumentException(message); - } + ErasureCodingPolicy ecPolicy = getErasureCodingPolicyByName(fsn, + ecPolicyName); iip = fsd.resolvePath(pc, src, DirOp.WRITE_LINK); // Write access is required to set erasure coding policy if (fsd.isPermissionEnabled()) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java index bb920049b3..7bf291634d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.server.namenode; import com.google.common.base.Preconditions; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.hdfs.AddBlockFlag; import org.apache.hadoop.fs.CreateFlag; @@ -351,7 +352,7 @@ static HdfsFileStatus startFile( EnumSet flag, boolean createParent, short replication, long blockSize, FileEncryptionInfo feInfo, INode.BlocksMapUpdateInfo toRemoveBlocks, - boolean logRetryEntry) + String ecPolicyName, boolean logRetryEntry) throws IOException { assert fsn.hasWriteLock(); boolean overwrite = flag.contains(CreateFlag.OVERWRITE); @@ -385,7 +386,7 @@ static HdfsFileStatus startFile( FSDirMkdirOp.createAncestorDirectories(fsd, iip, permissions); if (parent != null) { iip = addFile(fsd, parent, iip.getLastLocalName(), permissions, - replication, blockSize, holder, clientMachine); + replication, blockSize, holder, clientMachine, ecPolicyName); newNode = iip != null ? iip.getLastINode().asFile() : null; } if (newNode == null) { @@ -521,7 +522,7 @@ private static BlockInfo addBlock(FSDirectory fsd, String path, private static INodesInPath addFile( FSDirectory fsd, INodesInPath existing, byte[] localName, PermissionStatus permissions, short replication, long preferredBlockSize, - String clientName, String clientMachine) + String clientName, String clientMachine, String ecPolicyName) throws IOException { Preconditions.checkNotNull(existing); @@ -530,8 +531,14 @@ private static INodesInPath addFile( fsd.writeLock(); try { boolean isStriped = false; - ErasureCodingPolicy ecPolicy = FSDirErasureCodingOp. - unprotectedGetErasureCodingPolicy(fsd.getFSNamesystem(), existing); + ErasureCodingPolicy ecPolicy; + if (!StringUtils.isEmpty(ecPolicyName)) { + ecPolicy = FSDirErasureCodingOp.getErasureCodingPolicyByName( + fsd.getFSNamesystem(), ecPolicyName); + } else { + ecPolicy = FSDirErasureCodingOp.unprotectedGetErasureCodingPolicy( + fsd.getFSNamesystem(), existing); + } if (ecPolicy != null) { isStriped = true; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index e004b3f1ee..e24778f9c9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -2180,14 +2180,14 @@ CryptoProtocolVersion chooseProtocolVersion( */ HdfsFileStatus startFile(String src, PermissionStatus permissions, String holder, String clientMachine, EnumSet flag, - boolean createParent, short replication, long blockSize, - CryptoProtocolVersion[] supportedVersions, boolean logRetryCache) - throws IOException { + boolean createParent, short replication, long blockSize, + CryptoProtocolVersion[] supportedVersions, String ecPolicyName, + boolean logRetryCache) throws IOException { HdfsFileStatus status; try { status = startFileInt(src, permissions, holder, clientMachine, flag, - createParent, replication, blockSize, supportedVersions, + createParent, replication, blockSize, supportedVersions, ecPolicyName, logRetryCache); } catch (AccessControlException e) { logAuditEvent(false, "create", src); @@ -2201,8 +2201,7 @@ private HdfsFileStatus startFileInt(String src, PermissionStatus permissions, String holder, String clientMachine, EnumSet flag, boolean createParent, short replication, long blockSize, CryptoProtocolVersion[] supportedVersions, - boolean logRetryCache) - throws IOException { + String ecPolicyName, boolean logRetryCache) throws IOException { if (NameNode.stateChangeLog.isDebugEnabled()) { StringBuilder builder = new StringBuilder(); builder.append("DIR* NameSystem.startFile: src=").append(src) @@ -2270,9 +2269,8 @@ private HdfsFileStatus startFileInt(String src, dir.writeLock(); try { stat = FSDirWriteFileOp.startFile(this, iip, permissions, holder, - clientMachine, flag, createParent, - replication, blockSize, feInfo, - toRemoveBlocks, logRetryCache); + clientMachine, flag, createParent, replication, blockSize, feInfo, + toRemoveBlocks, ecPolicyName, logRetryCache); } catch (IOException e) { skipSync = e instanceof StandbyException; throw e; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index f792e8a9a4..e477b81062 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -722,8 +722,8 @@ public FsServerDefaults getServerDefaults() throws IOException { @Override // ClientProtocol public HdfsFileStatus create(String src, FsPermission masked, String clientName, EnumSetWritable flag, - boolean createParent, short replication, long blockSize, - CryptoProtocolVersion[] supportedVersions) + boolean createParent, short replication, long blockSize, + CryptoProtocolVersion[] supportedVersions, String ecPolicyName) throws IOException { checkNNStartup(); String clientMachine = getClientMachine(); @@ -747,7 +747,7 @@ public HdfsFileStatus create(String src, FsPermission masked, .getShortUserName(), null, masked); status = namesystem.startFile(src, perm, clientName, clientMachine, flag.get(), createParent, replication, blockSize, supportedVersions, - cacheEntry != null); + ecPolicyName, cacheEntry != null); } finally { RetryCache.setState(cacheEntry, status != null, status); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java index 445e19dedc..9dccad5a96 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java @@ -1966,7 +1966,7 @@ public static void createStripedFile(MiniDFSCluster cluster, Path file, .create(file.toString(), new FsPermission((short)0755), dfs.getClient().getClientName(), new EnumSetWritable<>(EnumSet.of(CreateFlag.CREATE)), - false, (short)1, 128*1024*1024L, null); + false, (short)1, 128*1024*1024L, null, null); FSNamesystem ns = cluster.getNamesystem(); FSDirectory fsdir = ns.getFSDirectory(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java index bb5a8d84d0..7a71df8d90 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java @@ -269,7 +269,8 @@ public Object answer(InvocationOnMock invocation) .when(mockNN) .create(anyString(), (FsPermission) anyObject(), anyString(), (EnumSetWritable) anyObject(), anyBoolean(), - anyShort(), anyLong(), (CryptoProtocolVersion[]) anyObject()); + anyShort(), anyLong(), (CryptoProtocolVersion[]) anyObject(), + anyObject()); final DFSClient client = new DFSClient(null, mockNN, conf, null); OutputStream os = client.create("testfile", true); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZones.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZones.java index 61cc433f86..1f5173234c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZones.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZones.java @@ -899,7 +899,8 @@ private static void mockCreate(ClientProtocol mcp, .when(mcp) .create(anyString(), (FsPermission) anyObject(), anyString(), (EnumSetWritable) anyObject(), anyBoolean(), - anyShort(), anyLong(), (CryptoProtocolVersion[]) anyObject()); + anyShort(), anyLong(), (CryptoProtocolVersion[]) anyObject(), + anyObject()); } // This test only uses mocks. Called from the end of an existing test to diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingPolicies.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingPolicies.java index c6f089093f..1aee929d83 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingPolicies.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingPolicies.java @@ -34,6 +34,7 @@ import org.apache.hadoop.io.erasurecode.ECSchema; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.test.GenericTestUtils; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -519,4 +520,48 @@ public HdfsAdmin run() throws Exception { noadmin.getErasureCodingPolicies(); superadmin.getErasureCodingPolicies(); } + + /** + * Test apply specific erasure coding policy on single file. Usually file's + * policy is inherited from its parent. + */ + @Test + public void testFileLevelECPolicy() throws Exception { + final Path dirPath = new Path("/striped"); + final Path filePath0 = new Path(dirPath, "file0"); + final Path filePath1 = new Path(dirPath, "file1"); + + fs.mkdirs(dirPath); + fs.setErasureCodingPolicy(dirPath, EC_POLICY.getName()); + + // null EC policy name value means inheriting parent directory's policy + fs.newFSDataOutputStreamBuilder(filePath0).build().close(); + ErasureCodingPolicy ecPolicyOnFile = fs.getErasureCodingPolicy(filePath0); + assertEquals(EC_POLICY, ecPolicyOnFile); + + // Test illegal EC policy name + final String illegalPolicyName = "RS-DEFAULT-1-2-64k"; + try { + fs.newFSDataOutputStreamBuilder(filePath1) + .setEcPolicyName(illegalPolicyName).build().close(); + Assert.fail("illegal erasure coding policy should not be found"); + } catch (Exception e) { + GenericTestUtils.assertExceptionContains("Policy '" + illegalPolicyName + + "' does not match any enabled erasure coding policies", e); + } + fs.delete(dirPath, true); + + // Test create a file with a different EC policy than its parent directory + fs.mkdirs(dirPath); + final ErasureCodingPolicy ecPolicyOnDir = + SystemErasureCodingPolicies.getByID( + SystemErasureCodingPolicies.RS_3_2_POLICY_ID); + ecPolicyOnFile = EC_POLICY; + fs.setErasureCodingPolicy(dirPath, ecPolicyOnDir.getName()); + fs.newFSDataOutputStreamBuilder(filePath0) + .setEcPolicyName(ecPolicyOnFile.getName()).build().close(); + assertEquals(ecPolicyOnFile, fs.getErasureCodingPolicy(filePath0)); + assertEquals(ecPolicyOnDir, fs.getErasureCodingPolicy(dirPath)); + fs.delete(dirPath, true); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java index c04b7be574..9dff529ef7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java @@ -1143,7 +1143,7 @@ private void doCreateTest(CreationMethod method) throws Exception { try { nnrpc.create(pathStr, new FsPermission((short)0755), "client", new EnumSetWritable(EnumSet.of(CreateFlag.CREATE)), - true, (short)1, 128*1024*1024L, null); + true, (short)1, 128*1024*1024L, null, null); fail("Should have thrown exception when creating '" + pathStr + "'" + " by " + method); } catch (InvalidPathException ipe) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java index 20596c56b9..16cdf9b322 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java @@ -363,7 +363,8 @@ public void testFactory() throws Exception { .when(mcp) .create(anyString(), (FsPermission) anyObject(), anyString(), (EnumSetWritable) anyObject(), anyBoolean(), - anyShort(), anyLong(), (CryptoProtocolVersion[]) anyObject()); + anyShort(), anyLong(), (CryptoProtocolVersion[]) anyObject(), + anyObject()); final Configuration conf = new Configuration(); final DFSClient c1 = createDFSClientAs(ugi[0], conf); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java index c1f0a7be3f..3a3c47177a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java @@ -587,14 +587,16 @@ long executeOp(int daemonId, int inputIdx, String clientName) throws IOException { long start = Time.now(); // dummyActionNoSynch(fileIdx); - clientProto.create(fileNames[daemonId][inputIdx], FsPermission.getDefault(), - clientName, new EnumSetWritable(EnumSet - .of(CreateFlag.CREATE, CreateFlag.OVERWRITE)), true, - replication, BLOCK_SIZE, CryptoProtocolVersion.supported()); + clientProto.create(fileNames[daemonId][inputIdx], + FsPermission.getDefault(), clientName, + new EnumSetWritable(EnumSet + .of(CreateFlag.CREATE, CreateFlag.OVERWRITE)), true, + replication, BLOCK_SIZE, CryptoProtocolVersion.supported(), null); long end = Time.now(); - for(boolean written = !closeUponCreate; !written; + for (boolean written = !closeUponCreate; !written; written = clientProto.complete(fileNames[daemonId][inputIdx], - clientName, null, HdfsConstants.GRANDFATHER_INODE_ID)); + clientName, null, HdfsConstants.GRANDFATHER_INODE_ID)) { + }; return end-start; } @@ -1139,7 +1141,7 @@ void generateInputs(int[] ignore) throws IOException { String fileName = nameGenerator.getNextFileName("ThroughputBench"); clientProto.create(fileName, FsPermission.getDefault(), clientName, new EnumSetWritable(EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)), true, replication, - BLOCK_SIZE, CryptoProtocolVersion.supported()); + BLOCK_SIZE, CryptoProtocolVersion.supported(), null); ExtendedBlock lastBlock = addBlocks(fileName, clientName); clientProto.complete(fileName, clientName, lastBlock, HdfsConstants.GRANDFATHER_INODE_ID); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java index 94abe3ef83..1aa77266f0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java @@ -85,7 +85,7 @@ public void testRetryAddBlockWhileInChooseTarget() throws Exception { nn.create(src, FsPermission.getFileDefault(), "clientName", new EnumSetWritable(EnumSet.of(CreateFlag.CREATE)), - true, (short)3, 1024, null); + true, (short)3, 1024, null, null); // start first addBlock() LOG.info("Starting first addBlock for " + src); @@ -157,7 +157,7 @@ public void testAddBlockRetryShouldReturnBlockWithLocations() // create file nameNodeRpc.create(src, FsPermission.getFileDefault(), "clientName", new EnumSetWritable(EnumSet.of(CreateFlag.CREATE)), true, - (short) 3, 1024, null); + (short) 3, 1024, null, null); // start first addBlock() LOG.info("Starting first addBlock for " + src); LocatedBlock lb1 = nameNodeRpc.addBlock(src, "clientName", null, null, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockPlacementPolicyRackFaultTolerant.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockPlacementPolicyRackFaultTolerant.java index f40c464e24..7cef64be22 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockPlacementPolicyRackFaultTolerant.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockPlacementPolicyRackFaultTolerant.java @@ -109,7 +109,7 @@ private void doTestChooseTargetNormalCase() throws Exception { // Create the file with client machine HdfsFileStatus fileStatus = namesystem.startFile(src, perm, clientMachine, clientMachine, EnumSet.of(CreateFlag.CREATE), true, - replication, DEFAULT_BLOCK_SIZE, null, false); + replication, DEFAULT_BLOCK_SIZE, null, null, false); //test chooseTarget for new file LocatedBlock locatedBlock = nameNodeRpc.addBlock(src, clientMachine, @@ -139,7 +139,7 @@ private void doTestChooseTargetSpecialCase() throws Exception { // Create the file with client machine HdfsFileStatus fileStatus = namesystem.startFile(src, perm, clientMachine, clientMachine, EnumSet.of(CreateFlag.CREATE), true, - (short) 20, DEFAULT_BLOCK_SIZE, null, false); + (short) 20, DEFAULT_BLOCK_SIZE, null, null, false); //test chooseTarget for new file LocatedBlock locatedBlock = nameNodeRpc.addBlock(src, clientMachine, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDefaultBlockPlacementPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDefaultBlockPlacementPolicy.java index 1a10b7a535..0931ff44ad 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDefaultBlockPlacementPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDefaultBlockPlacementPolicy.java @@ -138,7 +138,7 @@ private void testPlacement(String clientMachine, // Create the file with client machine HdfsFileStatus fileStatus = namesystem.startFile(src, perm, clientMachine, clientMachine, EnumSet.of(CreateFlag.CREATE), true, - REPLICATION_FACTOR, DEFAULT_BLOCK_SIZE, null, false); + REPLICATION_FACTOR, DEFAULT_BLOCK_SIZE, null, null, false); LocatedBlock locatedBlock = nameNodeRpc.addBlock(src, clientMachine, null, null, fileStatus.getFileId(), null, null); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java index d7a2c811a5..d217813bd5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java @@ -223,15 +223,20 @@ public void testCreate() throws Exception { // Two retried calls succeed newCall(); HdfsFileStatus status = nnRpc.create(src, perm, "holder", - new EnumSetWritable(EnumSet.of(CreateFlag.CREATE)), true, - (short) 1, BlockSize, null); - Assert.assertEquals(status, nnRpc.create(src, perm, "holder", new EnumSetWritable(EnumSet.of(CreateFlag.CREATE)), true, (short) 1, BlockSize, null)); - Assert.assertEquals(status, nnRpc.create(src, perm, "holder", new EnumSetWritable(EnumSet.of(CreateFlag.CREATE)), true, (short) 1, BlockSize, null)); - + new EnumSetWritable(EnumSet.of(CreateFlag.CREATE)), true, + (short) 1, BlockSize, null, null); + Assert.assertEquals(status, nnRpc.create(src, perm, "holder", + new EnumSetWritable(EnumSet.of(CreateFlag.CREATE)), true, + (short) 1, BlockSize, null, null)); + Assert.assertEquals(status, nnRpc.create(src, perm, "holder", + new EnumSetWritable(EnumSet.of(CreateFlag.CREATE)), true, + (short) 1, BlockSize, null, null)); // A non-retried call fails newCall(); try { - nnRpc.create(src, perm, "holder", new EnumSetWritable(EnumSet.of(CreateFlag.CREATE)), true, (short) 1, BlockSize, null); + nnRpc.create(src, perm, "holder", + new EnumSetWritable(EnumSet.of(CreateFlag.CREATE)), + true, (short) 1, BlockSize, null, null); Assert.fail("testCreate - expected exception is not thrown"); } catch (IOException e) { // expected diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java index e29d51851e..b40006be73 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java @@ -405,7 +405,8 @@ void invoke() throws Exception { FsPermission.getFileDefault(), client.getClientName(), new EnumSetWritable(createFlag), false, DataNodes, BlockSize, - new CryptoProtocolVersion[] {CryptoProtocolVersion.ENCRYPTION_ZONES}); + new CryptoProtocolVersion[] {CryptoProtocolVersion.ENCRYPTION_ZONES}, + null); } @Override