HDFS-10996. Ability to specify per-file EC policy at create time. Contributed by SammiChen.

This commit is contained in:
Andrew Wang 2017-04-12 12:27:34 -07:00
parent 966b1b5b44
commit a7312715a6
23 changed files with 200 additions and 74 deletions

View File

@ -1190,13 +1190,31 @@ public DFSOutputStream create(String src, FsPermission permission,
long blockSize, Progressable progress, int buffersize, long blockSize, Progressable progress, int buffersize,
ChecksumOpt checksumOpt, InetSocketAddress[] favoredNodes) ChecksumOpt checksumOpt, InetSocketAddress[] favoredNodes)
throws IOException { throws IOException {
return create(src, permission, flag, createParent, replication, blockSize,
progress, buffersize, checksumOpt, favoredNodes, null);
}
/**
* Same as {@link #create(String, FsPermission, EnumSet, boolean, short, long,
* Progressable, int, ChecksumOpt, InetSocketAddress[])} with the addition of
* ecPolicyName that is used to specify a specific erasure coding policy
* instead of inheriting any policy from this new file's parent directory.
* This policy will be persisted in HDFS. A value of null means inheriting
* parent groups' whatever policy.
*/
public DFSOutputStream create(String src, FsPermission permission,
EnumSet<CreateFlag> flag, boolean createParent, short replication,
long blockSize, Progressable progress, int buffersize,
ChecksumOpt checksumOpt, InetSocketAddress[] favoredNodes,
String ecPolicyName) throws IOException {
checkOpen(); checkOpen();
final FsPermission masked = applyUMask(permission); final FsPermission masked = applyUMask(permission);
LOG.debug("{}: masked={}", src, masked); LOG.debug("{}: masked={}", src, masked);
final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this, final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this,
src, masked, flag, createParent, replication, blockSize, progress, src, masked, flag, createParent, replication, blockSize, progress,
dfsClientConf.createChecksum(checksumOpt), dfsClientConf.createChecksum(checksumOpt),
getFavoredNodesStr(favoredNodes)); getFavoredNodesStr(favoredNodes), ecPolicyName);
beginFileLease(result.getFileId(), result); beginFileLease(result.getFileId(), result);
return result; return result;
} }
@ -1249,7 +1267,8 @@ public DFSOutputStream primitiveCreate(String src, FsPermission absPermission,
if (result == null) { if (result == null) {
DataChecksum checksum = dfsClientConf.createChecksum(checksumOpt); DataChecksum checksum = dfsClientConf.createChecksum(checksumOpt);
result = DFSOutputStream.newStreamForCreate(this, src, absPermission, result = DFSOutputStream.newStreamForCreate(this, src, absPermission,
flag, createParent, replication, blockSize, progress, checksum, null); flag, createParent, replication, blockSize, progress, checksum,
null, null);
} }
beginFileLease(result.getFileId(), result); beginFileLease(result.getFileId(), result);
return result; return result;

View File

@ -255,7 +255,8 @@ protected DFSOutputStream(DFSClient dfsClient, String src,
static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src, static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,
FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent, FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent,
short replication, long blockSize, Progressable progress, short replication, long blockSize, Progressable progress,
DataChecksum checksum, String[] favoredNodes) throws IOException { DataChecksum checksum, String[] favoredNodes, String ecPolicyName)
throws IOException {
try (TraceScope ignored = try (TraceScope ignored =
dfsClient.newPathTraceScope("newStreamForCreate", src)) { dfsClient.newPathTraceScope("newStreamForCreate", src)) {
HdfsFileStatus stat = null; HdfsFileStatus stat = null;
@ -269,7 +270,7 @@ static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,
try { try {
stat = dfsClient.namenode.create(src, masked, dfsClient.clientName, stat = dfsClient.namenode.create(src, masked, dfsClient.clientName,
new EnumSetWritable<>(flag), createParent, replication, new EnumSetWritable<>(flag), createParent, replication,
blockSize, SUPPORTED_CRYPTO_VERSIONS); blockSize, SUPPORTED_CRYPTO_VERSIONS, ecPolicyName);
break; break;
} catch (RemoteException re) { } catch (RemoteException re) {
IOException e = re.unwrapRemoteException( IOException e = re.unwrapRemoteException(

View File

@ -100,6 +100,8 @@
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import javax.annotation.Nonnull;
/**************************************************************** /****************************************************************
* Implementation of the abstract FileSystem for the DFS system. * Implementation of the abstract FileSystem for the DFS system.
* This object is the way end-user code interacts with a Hadoop * This object is the way end-user code interacts with a Hadoop
@ -456,13 +458,18 @@ public FSDataOutputStream next(final FileSystem fs, final Path p)
* at the creation time only. And with favored nodes, blocks will be pinned * at the creation time only. And with favored nodes, blocks will be pinned
* on the datanodes to prevent balancing move the block. HDFS could move the * on the datanodes to prevent balancing move the block. HDFS could move the
* blocks during replication, to move the blocks from favored nodes. A value * blocks during replication, to move the blocks from favored nodes. A value
* of null means no favored nodes for this create * of null means no favored nodes for this create.
* Another addition is ecPolicyName. A non-null ecPolicyName specifies an
* explicit erasure coding policy for this file, overriding the inherited
* policy. A null ecPolicyName means the file will inherit its EC policy from
* an ancestor (the default).
*/ */
private HdfsDataOutputStream create(final Path f, private HdfsDataOutputStream create(final Path f,
final FsPermission permission, EnumSet<CreateFlag> flag, final FsPermission permission, final EnumSet<CreateFlag> flag,
final int bufferSize, final short replication, final long blockSize, final int bufferSize, final short replication, final long blockSize,
final Progressable progress, final ChecksumOpt checksumOpt, final Progressable progress, final ChecksumOpt checksumOpt,
final InetSocketAddress[] favoredNodes) throws IOException { final InetSocketAddress[] favoredNodes, final String ecPolicyName)
throws IOException {
statistics.incrementWriteOps(1); statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.CREATE); storageStatistics.incrementOpCounter(OpType.CREATE);
Path absF = fixRelativePart(f); Path absF = fixRelativePart(f);
@ -471,7 +478,7 @@ private HdfsDataOutputStream create(final Path f,
public HdfsDataOutputStream doCall(final Path p) throws IOException { public HdfsDataOutputStream doCall(final Path p) throws IOException {
final DFSOutputStream out = dfs.create(getPathName(f), permission, final DFSOutputStream out = dfs.create(getPathName(f), permission,
flag, true, replication, blockSize, progress, bufferSize, flag, true, replication, blockSize, progress, bufferSize,
checksumOpt, favoredNodes); checksumOpt, favoredNodes, ecPolicyName);
return dfs.createWrappedOutputStream(out, statistics); return dfs.createWrappedOutputStream(out, statistics);
} }
@Override @Override
@ -480,7 +487,7 @@ public HdfsDataOutputStream next(final FileSystem fs, final Path p)
if (fs instanceof DistributedFileSystem) { if (fs instanceof DistributedFileSystem) {
DistributedFileSystem myDfs = (DistributedFileSystem)fs; DistributedFileSystem myDfs = (DistributedFileSystem)fs;
return myDfs.create(p, permission, flag, bufferSize, replication, return myDfs.create(p, permission, flag, bufferSize, replication,
blockSize, progress, checksumOpt, favoredNodes); blockSize, progress, checksumOpt, favoredNodes, ecPolicyName);
} }
throw new UnsupportedOperationException("Cannot create with" + throw new UnsupportedOperationException("Cannot create with" +
" favoredNodes through a symlink to a non-DistributedFileSystem: " " favoredNodes through a symlink to a non-DistributedFileSystem: "
@ -2645,6 +2652,7 @@ public static class HdfsDataOutputStreamBuilder
extends FSDataOutputStreamBuilder { extends FSDataOutputStreamBuilder {
private final DistributedFileSystem dfs; private final DistributedFileSystem dfs;
private InetSocketAddress[] favoredNodes = null; private InetSocketAddress[] favoredNodes = null;
private String ecPolicyName = null;
public HdfsDataOutputStreamBuilder(DistributedFileSystem dfs, Path path) { public HdfsDataOutputStreamBuilder(DistributedFileSystem dfs, Path path) {
super(dfs, path); super(dfs, path);
@ -2656,17 +2664,29 @@ protected InetSocketAddress[] getFavoredNodes() {
} }
public HdfsDataOutputStreamBuilder setFavoredNodes( public HdfsDataOutputStreamBuilder setFavoredNodes(
final InetSocketAddress[] nodes) { @Nonnull final InetSocketAddress[] nodes) {
Preconditions.checkNotNull(nodes); Preconditions.checkNotNull(nodes);
favoredNodes = nodes.clone(); favoredNodes = nodes.clone();
return this; return this;
} }
protected String getEcPolicyName() {
return ecPolicyName;
}
public HdfsDataOutputStreamBuilder setEcPolicyName(
@Nonnull final String policyName) {
Preconditions.checkNotNull(policyName);
ecPolicyName = policyName;
return this;
}
@Override @Override
public HdfsDataOutputStream build() throws IOException { public HdfsDataOutputStream build() throws IOException {
return dfs.create(getPath(), getPermission(), getFlags(), return dfs.create(getPath(), getPermission(), getFlags(),
getBufferSize(), getReplication(), getBlockSize(), getBufferSize(), getReplication(), getBlockSize(),
getProgress(), getChecksumOpt(), getFavoredNodes()); getProgress(), getChecksumOpt(), getFavoredNodes(),
getEcPolicyName());
} }
} }

View File

@ -160,6 +160,10 @@ LocatedBlocks getBlockLocations(String src, long offset, long length)
* @param replication block replication factor. * @param replication block replication factor.
* @param blockSize maximum block size. * @param blockSize maximum block size.
* @param supportedVersions CryptoProtocolVersions supported by the client * @param supportedVersions CryptoProtocolVersions supported by the client
* @param ecPolicyName the name of erasure coding policy. A null value means
* this file will inherit its parent directory's policy,
* either traditional replication or erasure coding
* policy.
* *
* @return the status of the created file, it could be null if the server * @return the status of the created file, it could be null if the server
* doesn't support returning the file status * doesn't support returning the file status
@ -193,7 +197,7 @@ LocatedBlocks getBlockLocations(String src, long offset, long length)
HdfsFileStatus create(String src, FsPermission masked, HdfsFileStatus create(String src, FsPermission masked,
String clientName, EnumSetWritable<CreateFlag> flag, String clientName, EnumSetWritable<CreateFlag> flag,
boolean createParent, short replication, long blockSize, boolean createParent, short replication, long blockSize,
CryptoProtocolVersion[] supportedVersions) CryptoProtocolVersion[] supportedVersions, String ecPolicyName)
throws IOException; throws IOException;
/** /**

View File

@ -285,7 +285,7 @@ public FsServerDefaults getServerDefaults() throws IOException {
public HdfsFileStatus create(String src, FsPermission masked, public HdfsFileStatus create(String src, FsPermission masked,
String clientName, EnumSetWritable<CreateFlag> flag, String clientName, EnumSetWritable<CreateFlag> flag,
boolean createParent, short replication, long blockSize, boolean createParent, short replication, long blockSize,
CryptoProtocolVersion[] supportedVersions) CryptoProtocolVersion[] supportedVersions, String ecPolicyName)
throws IOException { throws IOException {
CreateRequestProto.Builder builder = CreateRequestProto.newBuilder() CreateRequestProto.Builder builder = CreateRequestProto.newBuilder()
.setSrc(src) .setSrc(src)
@ -295,6 +295,9 @@ public HdfsFileStatus create(String src, FsPermission masked,
.setCreateParent(createParent) .setCreateParent(createParent)
.setReplication(replication) .setReplication(replication)
.setBlockSize(blockSize); .setBlockSize(blockSize);
if (ecPolicyName != null) {
builder.setEcPolicyName(ecPolicyName);
}
FsPermission unmasked = masked.getUnmasked(); FsPermission unmasked = masked.getUnmasked();
if (unmasked != null) { if (unmasked != null) {
builder.setUnmasked(PBHelperClient.convert(unmasked)); builder.setUnmasked(PBHelperClient.convert(unmasked));

View File

@ -80,6 +80,7 @@ message CreateRequestProto {
required uint64 blockSize = 7; required uint64 blockSize = 7;
repeated CryptoProtocolVersionProto cryptoProtocolVersion = 8; repeated CryptoProtocolVersionProto cryptoProtocolVersion = 8;
optional FsPermissionProto unmasked = 9; optional FsPermissionProto unmasked = 9;
optional string ecPolicyName = 10;
} }
message CreateResponseProto { message CreateResponseProto {

View File

@ -424,7 +424,8 @@ public CreateResponseProto create(RpcController controller,
PBHelperClient.convertCreateFlag(req.getCreateFlag()), req.getCreateParent(), PBHelperClient.convertCreateFlag(req.getCreateFlag()), req.getCreateParent(),
(short) req.getReplication(), req.getBlockSize(), (short) req.getReplication(), req.getBlockSize(),
PBHelperClient.convertCryptoProtocolVersions( PBHelperClient.convertCryptoProtocolVersions(
req.getCryptoProtocolVersionList())); req.getCryptoProtocolVersionList()),
req.getEcPolicyName());
if (result != null) { if (result != null) {
return CreateResponseProto.newBuilder().setFs(PBHelperClient.convert(result)) return CreateResponseProto.newBuilder().setFs(PBHelperClient.convert(result))

View File

@ -58,6 +58,39 @@ final class FSDirErasureCodingOp {
*/ */
private FSDirErasureCodingOp() {} private FSDirErasureCodingOp() {}
/**
* Check if the ecPolicyName is valid and enabled, return the corresponding
* EC policy if is.
* @param fsn namespace
* @param ecPolicyName name of EC policy to be checked
* @return an erasure coding policy if ecPolicyName is valid and enabled
* @throws IOException
*/
static ErasureCodingPolicy getErasureCodingPolicyByName(
final FSNamesystem fsn, final String ecPolicyName) throws IOException {
assert fsn.hasReadLock();
ErasureCodingPolicy ecPolicy = fsn.getErasureCodingPolicyManager()
.getEnabledPolicyByName(ecPolicyName);
if (ecPolicy == null) {
final String sysPolicies =
Arrays.asList(
fsn.getErasureCodingPolicyManager().getEnabledPolicies())
.stream()
.map(ErasureCodingPolicy::getName)
.collect(Collectors.joining(", "));
final String message = String.format("Policy '%s' does not match any " +
"enabled erasure" +
" coding policies: [%s]. The set of enabled erasure coding " +
"policies can be configured at '%s'.",
ecPolicyName,
sysPolicies,
DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_ENABLED_KEY
);
throw new HadoopIllegalArgumentException(message);
}
return ecPolicy;
}
/** /**
* Set an erasure coding policy on the given path. * Set an erasure coding policy on the given path.
* *
@ -84,25 +117,8 @@ static HdfsFileStatus setErasureCodingPolicy(final FSNamesystem fsn,
List<XAttr> xAttrs; List<XAttr> xAttrs;
fsd.writeLock(); fsd.writeLock();
try { try {
ErasureCodingPolicy ecPolicy = fsn.getErasureCodingPolicyManager() ErasureCodingPolicy ecPolicy = getErasureCodingPolicyByName(fsn,
.getEnabledPolicyByName(ecPolicyName); ecPolicyName);
if (ecPolicy == null) {
final String sysPolicies =
Arrays.asList(
fsn.getErasureCodingPolicyManager().getEnabledPolicies())
.stream()
.map(ErasureCodingPolicy::getName)
.collect(Collectors.joining(", "));
final String message = String.format("Policy '%s' does not match any " +
"enabled erasure" +
" coding policies: [%s]. The set of enabled erasure coding " +
"policies can be configured at '%s'.",
ecPolicyName,
sysPolicies,
DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_ENABLED_KEY
);
throw new HadoopIllegalArgumentException(message);
}
iip = fsd.resolvePath(pc, src, DirOp.WRITE_LINK); iip = fsd.resolvePath(pc, src, DirOp.WRITE_LINK);
// Write access is required to set erasure coding policy // Write access is required to set erasure coding policy
if (fsd.isPermissionEnabled()) { if (fsd.isPermissionEnabled()) {

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.namenode; package org.apache.hadoop.hdfs.server.namenode;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.hdfs.AddBlockFlag; import org.apache.hadoop.hdfs.AddBlockFlag;
import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.CreateFlag;
@ -351,7 +352,7 @@ static HdfsFileStatus startFile(
EnumSet<CreateFlag> flag, boolean createParent, EnumSet<CreateFlag> flag, boolean createParent,
short replication, long blockSize, short replication, long blockSize,
FileEncryptionInfo feInfo, INode.BlocksMapUpdateInfo toRemoveBlocks, FileEncryptionInfo feInfo, INode.BlocksMapUpdateInfo toRemoveBlocks,
boolean logRetryEntry) String ecPolicyName, boolean logRetryEntry)
throws IOException { throws IOException {
assert fsn.hasWriteLock(); assert fsn.hasWriteLock();
boolean overwrite = flag.contains(CreateFlag.OVERWRITE); boolean overwrite = flag.contains(CreateFlag.OVERWRITE);
@ -385,7 +386,7 @@ static HdfsFileStatus startFile(
FSDirMkdirOp.createAncestorDirectories(fsd, iip, permissions); FSDirMkdirOp.createAncestorDirectories(fsd, iip, permissions);
if (parent != null) { if (parent != null) {
iip = addFile(fsd, parent, iip.getLastLocalName(), permissions, iip = addFile(fsd, parent, iip.getLastLocalName(), permissions,
replication, blockSize, holder, clientMachine); replication, blockSize, holder, clientMachine, ecPolicyName);
newNode = iip != null ? iip.getLastINode().asFile() : null; newNode = iip != null ? iip.getLastINode().asFile() : null;
} }
if (newNode == null) { if (newNode == null) {
@ -521,7 +522,7 @@ private static BlockInfo addBlock(FSDirectory fsd, String path,
private static INodesInPath addFile( private static INodesInPath addFile(
FSDirectory fsd, INodesInPath existing, byte[] localName, FSDirectory fsd, INodesInPath existing, byte[] localName,
PermissionStatus permissions, short replication, long preferredBlockSize, PermissionStatus permissions, short replication, long preferredBlockSize,
String clientName, String clientMachine) String clientName, String clientMachine, String ecPolicyName)
throws IOException { throws IOException {
Preconditions.checkNotNull(existing); Preconditions.checkNotNull(existing);
@ -530,8 +531,14 @@ private static INodesInPath addFile(
fsd.writeLock(); fsd.writeLock();
try { try {
boolean isStriped = false; boolean isStriped = false;
ErasureCodingPolicy ecPolicy = FSDirErasureCodingOp. ErasureCodingPolicy ecPolicy;
unprotectedGetErasureCodingPolicy(fsd.getFSNamesystem(), existing); if (!StringUtils.isEmpty(ecPolicyName)) {
ecPolicy = FSDirErasureCodingOp.getErasureCodingPolicyByName(
fsd.getFSNamesystem(), ecPolicyName);
} else {
ecPolicy = FSDirErasureCodingOp.unprotectedGetErasureCodingPolicy(
fsd.getFSNamesystem(), existing);
}
if (ecPolicy != null) { if (ecPolicy != null) {
isStriped = true; isStriped = true;
} }

View File

@ -2181,13 +2181,13 @@ CryptoProtocolVersion chooseProtocolVersion(
HdfsFileStatus startFile(String src, PermissionStatus permissions, HdfsFileStatus startFile(String src, PermissionStatus permissions,
String holder, String clientMachine, EnumSet<CreateFlag> flag, String holder, String clientMachine, EnumSet<CreateFlag> flag,
boolean createParent, short replication, long blockSize, boolean createParent, short replication, long blockSize,
CryptoProtocolVersion[] supportedVersions, boolean logRetryCache) CryptoProtocolVersion[] supportedVersions, String ecPolicyName,
throws IOException { boolean logRetryCache) throws IOException {
HdfsFileStatus status; HdfsFileStatus status;
try { try {
status = startFileInt(src, permissions, holder, clientMachine, flag, status = startFileInt(src, permissions, holder, clientMachine, flag,
createParent, replication, blockSize, supportedVersions, createParent, replication, blockSize, supportedVersions, ecPolicyName,
logRetryCache); logRetryCache);
} catch (AccessControlException e) { } catch (AccessControlException e) {
logAuditEvent(false, "create", src); logAuditEvent(false, "create", src);
@ -2201,8 +2201,7 @@ private HdfsFileStatus startFileInt(String src,
PermissionStatus permissions, String holder, String clientMachine, PermissionStatus permissions, String holder, String clientMachine,
EnumSet<CreateFlag> flag, boolean createParent, short replication, EnumSet<CreateFlag> flag, boolean createParent, short replication,
long blockSize, CryptoProtocolVersion[] supportedVersions, long blockSize, CryptoProtocolVersion[] supportedVersions,
boolean logRetryCache) String ecPolicyName, boolean logRetryCache) throws IOException {
throws IOException {
if (NameNode.stateChangeLog.isDebugEnabled()) { if (NameNode.stateChangeLog.isDebugEnabled()) {
StringBuilder builder = new StringBuilder(); StringBuilder builder = new StringBuilder();
builder.append("DIR* NameSystem.startFile: src=").append(src) builder.append("DIR* NameSystem.startFile: src=").append(src)
@ -2270,9 +2269,8 @@ private HdfsFileStatus startFileInt(String src,
dir.writeLock(); dir.writeLock();
try { try {
stat = FSDirWriteFileOp.startFile(this, iip, permissions, holder, stat = FSDirWriteFileOp.startFile(this, iip, permissions, holder,
clientMachine, flag, createParent, clientMachine, flag, createParent, replication, blockSize, feInfo,
replication, blockSize, feInfo, toRemoveBlocks, ecPolicyName, logRetryCache);
toRemoveBlocks, logRetryCache);
} catch (IOException e) { } catch (IOException e) {
skipSync = e instanceof StandbyException; skipSync = e instanceof StandbyException;
throw e; throw e;

View File

@ -723,7 +723,7 @@ public FsServerDefaults getServerDefaults() throws IOException {
public HdfsFileStatus create(String src, FsPermission masked, public HdfsFileStatus create(String src, FsPermission masked,
String clientName, EnumSetWritable<CreateFlag> flag, String clientName, EnumSetWritable<CreateFlag> flag,
boolean createParent, short replication, long blockSize, boolean createParent, short replication, long blockSize,
CryptoProtocolVersion[] supportedVersions) CryptoProtocolVersion[] supportedVersions, String ecPolicyName)
throws IOException { throws IOException {
checkNNStartup(); checkNNStartup();
String clientMachine = getClientMachine(); String clientMachine = getClientMachine();
@ -747,7 +747,7 @@ public HdfsFileStatus create(String src, FsPermission masked,
.getShortUserName(), null, masked); .getShortUserName(), null, masked);
status = namesystem.startFile(src, perm, clientName, clientMachine, status = namesystem.startFile(src, perm, clientName, clientMachine,
flag.get(), createParent, replication, blockSize, supportedVersions, flag.get(), createParent, replication, blockSize, supportedVersions,
cacheEntry != null); ecPolicyName, cacheEntry != null);
} finally { } finally {
RetryCache.setState(cacheEntry, status != null, status); RetryCache.setState(cacheEntry, status != null, status);
} }

View File

@ -1966,7 +1966,7 @@ public static void createStripedFile(MiniDFSCluster cluster, Path file,
.create(file.toString(), new FsPermission((short)0755), .create(file.toString(), new FsPermission((short)0755),
dfs.getClient().getClientName(), dfs.getClient().getClientName(),
new EnumSetWritable<>(EnumSet.of(CreateFlag.CREATE)), new EnumSetWritable<>(EnumSet.of(CreateFlag.CREATE)),
false, (short)1, 128*1024*1024L, null); false, (short)1, 128*1024*1024L, null, null);
FSNamesystem ns = cluster.getNamesystem(); FSNamesystem ns = cluster.getNamesystem();
FSDirectory fsdir = ns.getFSDirectory(); FSDirectory fsdir = ns.getFSDirectory();

View File

@ -269,7 +269,8 @@ public Object answer(InvocationOnMock invocation)
.when(mockNN) .when(mockNN)
.create(anyString(), (FsPermission) anyObject(), anyString(), .create(anyString(), (FsPermission) anyObject(), anyString(),
(EnumSetWritable<CreateFlag>) anyObject(), anyBoolean(), (EnumSetWritable<CreateFlag>) anyObject(), anyBoolean(),
anyShort(), anyLong(), (CryptoProtocolVersion[]) anyObject()); anyShort(), anyLong(), (CryptoProtocolVersion[]) anyObject(),
anyObject());
final DFSClient client = new DFSClient(null, mockNN, conf, null); final DFSClient client = new DFSClient(null, mockNN, conf, null);
OutputStream os = client.create("testfile", true); OutputStream os = client.create("testfile", true);

View File

@ -899,7 +899,8 @@ private static void mockCreate(ClientProtocol mcp,
.when(mcp) .when(mcp)
.create(anyString(), (FsPermission) anyObject(), anyString(), .create(anyString(), (FsPermission) anyObject(), anyString(),
(EnumSetWritable<CreateFlag>) anyObject(), anyBoolean(), (EnumSetWritable<CreateFlag>) anyObject(), anyBoolean(),
anyShort(), anyLong(), (CryptoProtocolVersion[]) anyObject()); anyShort(), anyLong(), (CryptoProtocolVersion[]) anyObject(),
anyObject());
} }
// This test only uses mocks. Called from the end of an existing test to // This test only uses mocks. Called from the end of an existing test to

View File

@ -34,6 +34,7 @@
import org.apache.hadoop.io.erasurecode.ECSchema; import org.apache.hadoop.io.erasurecode.ECSchema;
import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
@ -519,4 +520,48 @@ public HdfsAdmin run() throws Exception {
noadmin.getErasureCodingPolicies(); noadmin.getErasureCodingPolicies();
superadmin.getErasureCodingPolicies(); superadmin.getErasureCodingPolicies();
} }
/**
* Test apply specific erasure coding policy on single file. Usually file's
* policy is inherited from its parent.
*/
@Test
public void testFileLevelECPolicy() throws Exception {
final Path dirPath = new Path("/striped");
final Path filePath0 = new Path(dirPath, "file0");
final Path filePath1 = new Path(dirPath, "file1");
fs.mkdirs(dirPath);
fs.setErasureCodingPolicy(dirPath, EC_POLICY.getName());
// null EC policy name value means inheriting parent directory's policy
fs.newFSDataOutputStreamBuilder(filePath0).build().close();
ErasureCodingPolicy ecPolicyOnFile = fs.getErasureCodingPolicy(filePath0);
assertEquals(EC_POLICY, ecPolicyOnFile);
// Test illegal EC policy name
final String illegalPolicyName = "RS-DEFAULT-1-2-64k";
try {
fs.newFSDataOutputStreamBuilder(filePath1)
.setEcPolicyName(illegalPolicyName).build().close();
Assert.fail("illegal erasure coding policy should not be found");
} catch (Exception e) {
GenericTestUtils.assertExceptionContains("Policy '" + illegalPolicyName
+ "' does not match any enabled erasure coding policies", e);
}
fs.delete(dirPath, true);
// Test create a file with a different EC policy than its parent directory
fs.mkdirs(dirPath);
final ErasureCodingPolicy ecPolicyOnDir =
SystemErasureCodingPolicies.getByID(
SystemErasureCodingPolicies.RS_3_2_POLICY_ID);
ecPolicyOnFile = EC_POLICY;
fs.setErasureCodingPolicy(dirPath, ecPolicyOnDir.getName());
fs.newFSDataOutputStreamBuilder(filePath0)
.setEcPolicyName(ecPolicyOnFile.getName()).build().close();
assertEquals(ecPolicyOnFile, fs.getErasureCodingPolicy(filePath0));
assertEquals(ecPolicyOnDir, fs.getErasureCodingPolicy(dirPath));
fs.delete(dirPath, true);
}
} }

View File

@ -1143,7 +1143,7 @@ private void doCreateTest(CreationMethod method) throws Exception {
try { try {
nnrpc.create(pathStr, new FsPermission((short)0755), "client", nnrpc.create(pathStr, new FsPermission((short)0755), "client",
new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE)), new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE)),
true, (short)1, 128*1024*1024L, null); true, (short)1, 128*1024*1024L, null, null);
fail("Should have thrown exception when creating '" fail("Should have thrown exception when creating '"
+ pathStr + "'" + " by " + method); + pathStr + "'" + " by " + method);
} catch (InvalidPathException ipe) { } catch (InvalidPathException ipe) {

View File

@ -363,7 +363,8 @@ public void testFactory() throws Exception {
.when(mcp) .when(mcp)
.create(anyString(), (FsPermission) anyObject(), anyString(), .create(anyString(), (FsPermission) anyObject(), anyString(),
(EnumSetWritable<CreateFlag>) anyObject(), anyBoolean(), (EnumSetWritable<CreateFlag>) anyObject(), anyBoolean(),
anyShort(), anyLong(), (CryptoProtocolVersion[]) anyObject()); anyShort(), anyLong(), (CryptoProtocolVersion[]) anyObject(),
anyObject());
final Configuration conf = new Configuration(); final Configuration conf = new Configuration();
final DFSClient c1 = createDFSClientAs(ugi[0], conf); final DFSClient c1 = createDFSClientAs(ugi[0], conf);

View File

@ -587,14 +587,16 @@ long executeOp(int daemonId, int inputIdx, String clientName)
throws IOException { throws IOException {
long start = Time.now(); long start = Time.now();
// dummyActionNoSynch(fileIdx); // dummyActionNoSynch(fileIdx);
clientProto.create(fileNames[daemonId][inputIdx], FsPermission.getDefault(), clientProto.create(fileNames[daemonId][inputIdx],
clientName, new EnumSetWritable<CreateFlag>(EnumSet FsPermission.getDefault(), clientName,
new EnumSetWritable<CreateFlag>(EnumSet
.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)), true, .of(CreateFlag.CREATE, CreateFlag.OVERWRITE)), true,
replication, BLOCK_SIZE, CryptoProtocolVersion.supported()); replication, BLOCK_SIZE, CryptoProtocolVersion.supported(), null);
long end = Time.now(); long end = Time.now();
for (boolean written = !closeUponCreate; !written; for (boolean written = !closeUponCreate; !written;
written = clientProto.complete(fileNames[daemonId][inputIdx], written = clientProto.complete(fileNames[daemonId][inputIdx],
clientName, null, HdfsConstants.GRANDFATHER_INODE_ID)); clientName, null, HdfsConstants.GRANDFATHER_INODE_ID)) {
};
return end-start; return end-start;
} }
@ -1139,7 +1141,7 @@ void generateInputs(int[] ignore) throws IOException {
String fileName = nameGenerator.getNextFileName("ThroughputBench"); String fileName = nameGenerator.getNextFileName("ThroughputBench");
clientProto.create(fileName, FsPermission.getDefault(), clientName, clientProto.create(fileName, FsPermission.getDefault(), clientName,
new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)), true, replication, new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)), true, replication,
BLOCK_SIZE, CryptoProtocolVersion.supported()); BLOCK_SIZE, CryptoProtocolVersion.supported(), null);
ExtendedBlock lastBlock = addBlocks(fileName, clientName); ExtendedBlock lastBlock = addBlocks(fileName, clientName);
clientProto.complete(fileName, clientName, lastBlock, HdfsConstants.GRANDFATHER_INODE_ID); clientProto.complete(fileName, clientName, lastBlock, HdfsConstants.GRANDFATHER_INODE_ID);
} }

View File

@ -85,7 +85,7 @@ public void testRetryAddBlockWhileInChooseTarget() throws Exception {
nn.create(src, FsPermission.getFileDefault(), nn.create(src, FsPermission.getFileDefault(),
"clientName", "clientName",
new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE)), new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE)),
true, (short)3, 1024, null); true, (short)3, 1024, null, null);
// start first addBlock() // start first addBlock()
LOG.info("Starting first addBlock for " + src); LOG.info("Starting first addBlock for " + src);
@ -157,7 +157,7 @@ public void testAddBlockRetryShouldReturnBlockWithLocations()
// create file // create file
nameNodeRpc.create(src, FsPermission.getFileDefault(), "clientName", nameNodeRpc.create(src, FsPermission.getFileDefault(), "clientName",
new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE)), true, new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE)), true,
(short) 3, 1024, null); (short) 3, 1024, null, null);
// start first addBlock() // start first addBlock()
LOG.info("Starting first addBlock for " + src); LOG.info("Starting first addBlock for " + src);
LocatedBlock lb1 = nameNodeRpc.addBlock(src, "clientName", null, null, LocatedBlock lb1 = nameNodeRpc.addBlock(src, "clientName", null, null,

View File

@ -109,7 +109,7 @@ private void doTestChooseTargetNormalCase() throws Exception {
// Create the file with client machine // Create the file with client machine
HdfsFileStatus fileStatus = namesystem.startFile(src, perm, HdfsFileStatus fileStatus = namesystem.startFile(src, perm,
clientMachine, clientMachine, EnumSet.of(CreateFlag.CREATE), true, clientMachine, clientMachine, EnumSet.of(CreateFlag.CREATE), true,
replication, DEFAULT_BLOCK_SIZE, null, false); replication, DEFAULT_BLOCK_SIZE, null, null, false);
//test chooseTarget for new file //test chooseTarget for new file
LocatedBlock locatedBlock = nameNodeRpc.addBlock(src, clientMachine, LocatedBlock locatedBlock = nameNodeRpc.addBlock(src, clientMachine,
@ -139,7 +139,7 @@ private void doTestChooseTargetSpecialCase() throws Exception {
// Create the file with client machine // Create the file with client machine
HdfsFileStatus fileStatus = namesystem.startFile(src, perm, HdfsFileStatus fileStatus = namesystem.startFile(src, perm,
clientMachine, clientMachine, EnumSet.of(CreateFlag.CREATE), true, clientMachine, clientMachine, EnumSet.of(CreateFlag.CREATE), true,
(short) 20, DEFAULT_BLOCK_SIZE, null, false); (short) 20, DEFAULT_BLOCK_SIZE, null, null, false);
//test chooseTarget for new file //test chooseTarget for new file
LocatedBlock locatedBlock = nameNodeRpc.addBlock(src, clientMachine, LocatedBlock locatedBlock = nameNodeRpc.addBlock(src, clientMachine,

View File

@ -138,7 +138,7 @@ private void testPlacement(String clientMachine,
// Create the file with client machine // Create the file with client machine
HdfsFileStatus fileStatus = namesystem.startFile(src, perm, HdfsFileStatus fileStatus = namesystem.startFile(src, perm,
clientMachine, clientMachine, EnumSet.of(CreateFlag.CREATE), true, clientMachine, clientMachine, EnumSet.of(CreateFlag.CREATE), true,
REPLICATION_FACTOR, DEFAULT_BLOCK_SIZE, null, false); REPLICATION_FACTOR, DEFAULT_BLOCK_SIZE, null, null, false);
LocatedBlock locatedBlock = nameNodeRpc.addBlock(src, clientMachine, LocatedBlock locatedBlock = nameNodeRpc.addBlock(src, clientMachine,
null, null, fileStatus.getFileId(), null, null); null, null, fileStatus.getFileId(), null, null);

View File

@ -224,14 +224,19 @@ public void testCreate() throws Exception {
newCall(); newCall();
HdfsFileStatus status = nnRpc.create(src, perm, "holder", HdfsFileStatus status = nnRpc.create(src, perm, "holder",
new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE)), true, new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE)), true,
(short) 1, BlockSize, null); (short) 1, BlockSize, null, null);
Assert.assertEquals(status, nnRpc.create(src, perm, "holder", new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE)), true, (short) 1, BlockSize, null)); Assert.assertEquals(status, nnRpc.create(src, perm, "holder",
Assert.assertEquals(status, nnRpc.create(src, perm, "holder", new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE)), true, (short) 1, BlockSize, null)); new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE)), true,
(short) 1, BlockSize, null, null));
Assert.assertEquals(status, nnRpc.create(src, perm, "holder",
new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE)), true,
(short) 1, BlockSize, null, null));
// A non-retried call fails // A non-retried call fails
newCall(); newCall();
try { try {
nnRpc.create(src, perm, "holder", new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE)), true, (short) 1, BlockSize, null); nnRpc.create(src, perm, "holder",
new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE)),
true, (short) 1, BlockSize, null, null);
Assert.fail("testCreate - expected exception is not thrown"); Assert.fail("testCreate - expected exception is not thrown");
} catch (IOException e) { } catch (IOException e) {
// expected // expected

View File

@ -405,7 +405,8 @@ void invoke() throws Exception {
FsPermission.getFileDefault(), client.getClientName(), FsPermission.getFileDefault(), client.getClientName(),
new EnumSetWritable<CreateFlag>(createFlag), false, DataNodes, new EnumSetWritable<CreateFlag>(createFlag), false, DataNodes,
BlockSize, BlockSize,
new CryptoProtocolVersion[] {CryptoProtocolVersion.ENCRYPTION_ZONES}); new CryptoProtocolVersion[] {CryptoProtocolVersion.ENCRYPTION_ZONES},
null);
} }
@Override @Override