From c2a52ef9c29459ff9ef3e23b29e14912bfdb1405 Mon Sep 17 00:00:00 2001 From: Andrew Wang Date: Thu, 4 May 2017 11:39:14 -0700 Subject: [PATCH] HDFS-11643. Add shouldReplicate option to create builder. Contributed by SammiChen. --- .../java/org/apache/hadoop/fs/CreateFlag.java | 8 ++- .../hadoop/hdfs/DistributedFileSystem.java | 59 +++++++++++++++---- .../hadoop/hdfs/protocol/ClientProtocol.java | 10 +++- .../hdfs/protocolPB/PBHelperClient.java | 7 +++ .../main/proto/ClientNamenodeProtocol.proto | 1 + .../server/balancer/NameNodeConnector.java | 13 +++- .../server/namenode/FSDirWriteFileOp.java | 31 +++++----- .../hdfs/server/namenode/FSNamesystem.java | 13 +++- .../hdfs/TestErasureCodingPolicies.java | 58 ++++++++++++++++++ .../hdfs/server/balancer/TestBalancer.java | 2 + 10 files changed, 170 insertions(+), 32 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CreateFlag.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CreateFlag.java index d480fc9f4c..383d65a06a 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CreateFlag.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CreateFlag.java @@ -110,7 +110,13 @@ public enum CreateFlag { * 'local' means the same host as the client is being run on. */ @InterfaceAudience.LimitedPrivate({"HBase"}) - NO_LOCAL_WRITE((short) 0x40); + NO_LOCAL_WRITE((short) 0x40), + + /** + * Enforce the file to be a replicated file, no matter what its parent + * directory's replication or erasure coding policy is. + */ + SHOULD_REPLICATE((short) 0x80); private final short mode; diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java index 429f4c2384..9e89bc5f93 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java @@ -100,6 +100,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import org.apache.commons.lang.StringUtils; import javax.annotation.Nonnull; @@ -462,17 +463,20 @@ public FSDataOutputStream next(final FileSystem fs, final Path p) /** * Same as * {@link #create(Path, FsPermission, EnumSet, int, short, long, - * Progressable, ChecksumOpt)} with the addition of favoredNodes that is a - * hint to where the namenode should place the file blocks. - * The favored nodes hint is not persisted in HDFS. Hence it may be honored - * at the creation time only. And with favored nodes, blocks will be pinned - * on the datanodes to prevent balancing move the block. HDFS could move the - * blocks during replication, to move the blocks from favored nodes. A value - * of null means no favored nodes for this create. - * Another addition is ecPolicyName. A non-null ecPolicyName specifies an + * Progressable, ChecksumOpt)} with a few additions. First, addition of + * favoredNodes that is a hint to where the namenode should place the file + * blocks. The favored nodes hint is not persisted in HDFS. Hence it may be + * honored at the creation time only. And with favored nodes, blocks will be + * pinned on the datanodes to prevent balancing move the block. HDFS could + * move the blocks during replication, to move the blocks from favored nodes. + * A value of null means no favored nodes for this create. + * The second addition is ecPolicyName. A non-null ecPolicyName specifies an * explicit erasure coding policy for this file, overriding the inherited - * policy. A null ecPolicyName means the file will inherit its EC policy from - * an ancestor (the default). + * policy. A null ecPolicyName means the file will inherit its EC policy or + * replication policy from its ancestor (the default). + * ecPolicyName and SHOULD_REPLICATE CreateFlag are mutually exclusive. It's + * invalid to set both SHOULD_REPLICATE and a non-null ecPolicyName. + * */ private HdfsDataOutputStream create(final Path f, final FsPermission permission, final EnumSet flag, @@ -2669,6 +2673,7 @@ public static class HdfsDataOutputStreamBuilder private final DistributedFileSystem dfs; private InetSocketAddress[] favoredNodes = null; private String ecPolicyName = null; + private boolean shouldReplicate = false; public HdfsDataOutputStreamBuilder(DistributedFileSystem dfs, Path path) { super(dfs, path); @@ -2690,6 +2695,14 @@ protected String getEcPolicyName() { return ecPolicyName; } + /** + * Enforce the file to be a striped file with erasure coding policy + * 'policyName', no matter what its parent directory's replication + * or erasure coding policy is. Don't call this function and + * enforceReplicate() in the same builder since they have conflict + * of interest. + * + */ public HdfsDataOutputStreamBuilder setEcPolicyName( @Nonnull final String policyName) { Preconditions.checkNotNull(policyName); @@ -2697,9 +2710,33 @@ public HdfsDataOutputStreamBuilder setEcPolicyName( return this; } + public boolean shouldReplicate() { + return shouldReplicate; + } + + /** + * Enforce the file to be a replicated file, no matter what its parent + * directory's replication or erasure coding policy is. Don't call this + * function and setEcPolicyName() in the same builder since they have + * conflict of interest. + */ + public HdfsDataOutputStreamBuilder replicate() { + shouldReplicate = true; + return this; + } + @Override public HdfsDataOutputStream build() throws IOException { - return dfs.create(getPath(), getPermission(), getFlags(), + Preconditions.checkState( + !(shouldReplicate() && (!StringUtils.isEmpty(getEcPolicyName()))), + "shouldReplicate and ecPolicyName are " + + "exclusive parameters. Set both is not allowed!"); + + EnumSet createFlags = getFlags(); + if (shouldReplicate()) { + createFlags.add(CreateFlag.SHOULD_REPLICATE); + } + return dfs.create(getPath(), getPermission(), createFlags, getBufferSize(), getReplication(), getBlockSize(), getProgress(), getChecksumOpt(), getFavoredNodes(), getEcPolicyName()); diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java index 117b9dd8e4..b178ddcb29 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java @@ -154,8 +154,10 @@ LocatedBlocks getBlockLocations(String src, long offset, long length) * @param src path of the file being created. * @param masked masked permission. * @param clientName name of the current client. - * @param flag indicates whether the file should be - * overwritten if it already exists or create if it does not exist or append. + * @param flag indicates whether the file should be overwritten if it already + * exists or create if it does not exist or append, or whether the + * file should be a replicate file, no matter what its ancestor's + * replication or erasure coding policy is. * @param createParent create missing parent directory if true * @param replication block replication factor. * @param blockSize maximum block size. @@ -163,7 +165,9 @@ LocatedBlocks getBlockLocations(String src, long offset, long length) * @param ecPolicyName the name of erasure coding policy. A null value means * this file will inherit its parent directory's policy, * either traditional replication or erasure coding - * policy. + * policy. ecPolicyName and SHOULD_REPLICATE CreateFlag + * are mutually exclusive. It's invalid to set both + * SHOULD_REPLICATE flag and a non-null ecPolicyName. * * @return the status of the created file, it could be null if the server * doesn't support returning the file status diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java index 6ca3541754..2b8f102064 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java @@ -1753,6 +1753,9 @@ public static int convertCreateFlag(EnumSetWritable flag) { if (flag.contains(CreateFlag.NEW_BLOCK)) { value |= CreateFlagProto.NEW_BLOCK.getNumber(); } + if (flag.contains(CreateFlag.SHOULD_REPLICATE)) { + value |= CreateFlagProto.SHOULD_REPLICATE.getNumber(); + } return value; } @@ -1966,6 +1969,10 @@ public static EnumSetWritable convertCreateFlag(int flag) { == CreateFlagProto.NEW_BLOCK_VALUE) { result.add(CreateFlag.NEW_BLOCK); } + if ((flag & CreateFlagProto.SHOULD_REPLICATE.getNumber()) + == CreateFlagProto.SHOULD_REPLICATE.getNumber()) { + result.add(CreateFlag.SHOULD_REPLICATE); + } return new EnumSetWritable<>(result, CreateFlag.class); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto index b8bd6bfa3d..eee3c4db01 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto @@ -68,6 +68,7 @@ enum CreateFlagProto { APPEND = 0x04; // Append to a file LAZY_PERSIST = 0x10; // File with reduced durability guarantees. NEW_BLOCK = 0x20; // Write data to a new block when appending + SHOULD_REPLICATE = 0x80; // Enforce to create a replicate file } message CreateRequestProto { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java index e62dd08afb..88e40ee0f9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java @@ -25,15 +25,18 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.EnumSet; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import com.google.common.base.Preconditions; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FsServerDefaults; @@ -241,7 +244,15 @@ private OutputStream checkAndMarkRunning() throws IOException { IOUtils.closeStream(fs.append(idPath)); fs.delete(idPath, true); } - final FSDataOutputStream fsout = fs.create(idPath, false); + + final FSDataOutputStream fsout = fs.newFSDataOutputStreamBuilder(idPath) + .replicate() + .setFlags(EnumSet.of(CreateFlag.CREATE)) + .build(); + + Preconditions.checkState(!fs.getFileStatus(idPath).isErasureCoded(), + "Id File should be a replicate file"); + // mark balancer idPath to be deleted during filesystem closure fs.deleteOnExit(idPath); if (write2IdFile) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java index 7bf291634d..a62cddd097 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java @@ -352,7 +352,7 @@ static HdfsFileStatus startFile( EnumSet flag, boolean createParent, short replication, long blockSize, FileEncryptionInfo feInfo, INode.BlocksMapUpdateInfo toRemoveBlocks, - String ecPolicyName, boolean logRetryEntry) + boolean shouldReplicate, String ecPolicyName, boolean logRetryEntry) throws IOException { assert fsn.hasWriteLock(); boolean overwrite = flag.contains(CreateFlag.OVERWRITE); @@ -386,7 +386,8 @@ static HdfsFileStatus startFile( FSDirMkdirOp.createAncestorDirectories(fsd, iip, permissions); if (parent != null) { iip = addFile(fsd, parent, iip.getLastLocalName(), permissions, - replication, blockSize, holder, clientMachine, ecPolicyName); + replication, blockSize, holder, clientMachine, shouldReplicate, + ecPolicyName); newNode = iip != null ? iip.getLastINode().asFile() : null; } if (newNode == null) { @@ -522,8 +523,8 @@ private static BlockInfo addBlock(FSDirectory fsd, String path, private static INodesInPath addFile( FSDirectory fsd, INodesInPath existing, byte[] localName, PermissionStatus permissions, short replication, long preferredBlockSize, - String clientName, String clientMachine, String ecPolicyName) - throws IOException { + String clientName, String clientMachine, boolean shouldReplicate, + String ecPolicyName) throws IOException { Preconditions.checkNotNull(existing); long modTime = now(); @@ -531,16 +532,18 @@ private static INodesInPath addFile( fsd.writeLock(); try { boolean isStriped = false; - ErasureCodingPolicy ecPolicy; - if (!StringUtils.isEmpty(ecPolicyName)) { - ecPolicy = FSDirErasureCodingOp.getErasureCodingPolicyByName( - fsd.getFSNamesystem(), ecPolicyName); - } else { - ecPolicy = FSDirErasureCodingOp.unprotectedGetErasureCodingPolicy( - fsd.getFSNamesystem(), existing); - } - if (ecPolicy != null) { - isStriped = true; + ErasureCodingPolicy ecPolicy = null; + if (!shouldReplicate) { + if (!StringUtils.isEmpty(ecPolicyName)) { + ecPolicy = FSDirErasureCodingOp.getErasureCodingPolicyByName( + fsd.getFSNamesystem(), ecPolicyName); + } else { + ecPolicy = FSDirErasureCodingOp.unprotectedGetErasureCodingPolicy( + fsd.getFSNamesystem(), existing); + } + if (ecPolicy != null) { + isStriped = true; + } } final BlockType blockType = isStriped ? BlockType.STRIPED : BlockType.CONTIGUOUS; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 103437a508..afcc717b8c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -2225,6 +2225,13 @@ private HdfsFileStatus startFileInt(String src, throw new InvalidPathException(src); } + boolean shouldReplicate = flag.contains(CreateFlag.SHOULD_REPLICATE); + if (shouldReplicate && + (!org.apache.commons.lang.StringUtils.isEmpty(ecPolicyName))) { + throw new HadoopIllegalArgumentException("SHOULD_REPLICATE flag and " + + "ecPolicyName are exclusive parameters. Set both is not allowed!"); + } + FSPermissionChecker pc = getPermissionChecker(); INodesInPath iip = null; boolean skipSync = true; // until we do something that might create edits @@ -2240,7 +2247,9 @@ private HdfsFileStatus startFileInt(String src, iip = FSDirWriteFileOp.resolvePathForStartFile( dir, pc, src, flag, createParent); - if (!FSDirErasureCodingOp.hasErasureCodingPolicy(this, iip)) { + if (shouldReplicate || + (org.apache.commons.lang.StringUtils.isEmpty(ecPolicyName) && + !FSDirErasureCodingOp.hasErasureCodingPolicy(this, iip))) { blockManager.verifyReplication(src, replication, clientMachine); } @@ -2272,7 +2281,7 @@ private HdfsFileStatus startFileInt(String src, try { stat = FSDirWriteFileOp.startFile(this, iip, permissions, holder, clientMachine, flag, createParent, replication, blockSize, feInfo, - toRemoveBlocks, ecPolicyName, logRetryCache); + toRemoveBlocks, shouldReplicate, ecPolicyName, logRetryCache); } catch (IOException e) { skipSync = e instanceof StandbyException; throw e; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingPolicies.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingPolicies.java index 1aee929d83..a14b08ccbc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingPolicies.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingPolicies.java @@ -19,6 +19,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; @@ -46,6 +47,7 @@ import java.io.IOException; import java.security.PrivilegedExceptionAction; import java.util.Collection; +import java.util.EnumSet; import java.util.List; import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains; @@ -564,4 +566,60 @@ public void testFileLevelECPolicy() throws Exception { assertEquals(ecPolicyOnDir, fs.getErasureCodingPolicy(dirPath)); fs.delete(dirPath, true); } + + /** + * Enforce file as replicated file without regarding its parent's EC policy. + */ + @Test + public void testEnforceAsReplicatedFile() throws Exception { + final Path dirPath = new Path("/striped"); + final Path filePath = new Path(dirPath, "file"); + + fs.mkdirs(dirPath); + fs.setErasureCodingPolicy(dirPath, EC_POLICY.getName()); + + final String ecPolicyName = "RS-10-4-64k"; + fs.newFSDataOutputStreamBuilder(filePath).build().close(); + assertEquals(EC_POLICY, fs.getErasureCodingPolicy(filePath)); + fs.delete(filePath, true); + + fs.newFSDataOutputStreamBuilder(filePath) + .setEcPolicyName(ecPolicyName) + .build() + .close(); + assertEquals(ecPolicyName, fs.getErasureCodingPolicy(filePath).getName()); + fs.delete(filePath, true); + + try { + fs.newFSDataOutputStreamBuilder(filePath) + .setEcPolicyName(ecPolicyName) + .replicate() + .build().close(); + Assert.fail("shouldReplicate and ecPolicyName are exclusive " + + "parameters. Set both is not allowed."); + }catch (Exception e){ + GenericTestUtils.assertExceptionContains("shouldReplicate and " + + "ecPolicyName are exclusive parameters. Set both is not allowed!", e); + } + + try { + final DFSClient dfsClient = fs.getClient(); + dfsClient.create(filePath.toString(), null, + EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE, + CreateFlag.SHOULD_REPLICATE), false, (short) 1, 1024, null, 1024, + null, null, ecPolicyName); + Assert.fail("SHOULD_REPLICATE flag and ecPolicyName are exclusive " + + "parameters. Set both is not allowed."); + }catch (Exception e){ + GenericTestUtils.assertExceptionContains("SHOULD_REPLICATE flag and " + + "ecPolicyName are exclusive parameters. Set both is not allowed!", e); + } + + fs.newFSDataOutputStreamBuilder(filePath) + .replicate() + .build() + .close(); + assertNull(fs.getErasureCodingPolicy(filePath)); + fs.delete(dirPath, true); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java index e177da375a..167997e659 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java @@ -1946,7 +1946,9 @@ public void integrationTestWithStripedFile(Configuration conf) throws Exception public void testBalancerWithStripedFile() throws Exception { Configuration conf = new Configuration(); initConfWithStripe(conf); + NameNodeConnector.setWrite2IdFile(true); doTestBalancerWithStripedFile(conf); + NameNodeConnector.setWrite2IdFile(false); } private void doTestBalancerWithStripedFile(Configuration conf) throws Exception {