HDFS-8435. Support CreateFlag in WebHDFS. Contributed by Jakob Homan

This commit is contained in:
Chris Douglas 2015-08-18 17:32:48 -07:00
parent 71aedfabf3
commit 30e342a5d3
11 changed files with 241 additions and 94 deletions

View File

@ -53,7 +53,7 @@
* partial block.</li>
* </ol>
*
* Following combination is not valid and will result in
* Following combinations are not valid and will result in
* {@link HadoopIllegalArgumentException}:
* <ol>
* <li> APPEND|OVERWRITE</li>

View File

@ -40,6 +40,7 @@
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.DelegationTokenRenewer;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
@ -1166,6 +1167,25 @@ public FSDataOutputStream create(final Path f, final FsPermission permission,
).run();
}
@Override
@SuppressWarnings("deprecation")
public FSDataOutputStream createNonRecursive(final Path f,
final FsPermission permission, final EnumSet<CreateFlag> flag,
final int bufferSize, final short replication, final long blockSize,
final Progressable progress) throws IOException {
statistics.incrementWriteOps(1);
final HttpOpParam.Op op = PutOpParam.Op.CREATE;
return new FsPathOutputStreamRunner(op, f, bufferSize,
new PermissionParam(applyUMask(permission)),
new CreateFlagParam(flag),
new CreateParentParam(false),
new BufferSizeParam(bufferSize),
new ReplicationParam(replication),
new BlockSizeParam(blockSize)
).run();
}
@Override
public FSDataOutputStream append(final Path f, final int bufferSize,
final Progressable progress) throws IOException {

View File

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hdfs.web.resources;
import org.apache.hadoop.fs.CreateFlag;
import java.util.EnumSet;
/**
* CreateFlag enum.
*/
public class CreateFlagParam extends EnumSetParam<CreateFlag> {
public static final String NAME = "createflag";
public static final String DEFAULT = "";
private static final Domain<CreateFlag> DOMAIN = new Domain<CreateFlag>(
NAME, CreateFlag.class);
public CreateFlagParam(final EnumSet<CreateFlag> createFlags) {
super(DOMAIN, createFlags);
}
public CreateFlagParam(final String str) {
super(DOMAIN, DOMAIN.parse(str));
}
@Override
public String getName() {
return NAME;
}
}

View File

@ -22,7 +22,7 @@ public class CreateParentParam extends BooleanParam {
/** Parameter name. */
public static final String NAME = "createparent";
/** Default parameter value. */
public static final String DEFAULT = FALSE;
public static final String DEFAULT = TRUE;
private static final Domain DOMAIN = new Domain(NAME);

View File

@ -806,6 +806,8 @@ Release 2.8.0 - UNRELEASED
HDFS-8278. When computing max-size-to-move in Balancer, count only the
storage with remaining >= default block size. (szetszwo)
HDFS-8435. Support CreateFlag in WebHDFS. (Jakob Homan via cdouglas)
OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -20,11 +20,14 @@
import io.netty.handler.codec.http.QueryStringDecoder;
import org.apache.commons.io.Charsets;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.HAUtilClient;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.web.resources.BlockSizeParam;
import org.apache.hadoop.hdfs.web.resources.BufferSizeParam;
import org.apache.hadoop.hdfs.web.resources.CreateFlagParam;
import org.apache.hadoop.hdfs.web.resources.CreateParentParam;
import org.apache.hadoop.hdfs.web.resources.DelegationParam;
import org.apache.hadoop.hdfs.web.resources.DoAsParam;
import org.apache.hadoop.hdfs.web.resources.HttpOpParam;
@ -41,6 +44,7 @@
import java.io.IOException;
import java.net.URI;
import java.nio.charset.Charset;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
@ -122,6 +126,16 @@ Token<DelegationTokenIdentifier> delegationToken() throws IOException {
return token;
}
public boolean createParent() {
return new CreateParentParam(param(CreateParentParam.NAME)).getValue();
}
public EnumSet<CreateFlag> createFlag() {
String cf = decodeComponent(param(CreateFlagParam.NAME), Charsets.UTF_8);
return new CreateFlagParam(cf).getValue();
}
Configuration conf() {
return conf;
}

View File

@ -59,6 +59,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
@ -84,6 +85,9 @@ public class WebHdfsHandler extends SimpleChannelInboundHandler<HttpRequest> {
public static final String APPLICATION_JSON_UTF8 =
"application/json; charset=utf-8";
public static final EnumSet<CreateFlag> EMPTY_CREATE_FLAG =
EnumSet.noneOf(CreateFlag.class);
private final Configuration conf;
private final Configuration confForCreate;
@ -155,15 +159,24 @@ private void onCreate(ChannelHandlerContext ctx)
final short replication = params.replication();
final long blockSize = params.blockSize();
final FsPermission permission = params.permission();
final boolean createParent = params.createParent();
EnumSet<CreateFlag> flags = params.overwrite() ?
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
: EnumSet.of(CreateFlag.CREATE);
EnumSet<CreateFlag> flags = params.createFlag();
if (flags.equals(EMPTY_CREATE_FLAG)) {
flags = params.overwrite() ?
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
: EnumSet.of(CreateFlag.CREATE);
} else {
if(params.overwrite()) {
flags.add(CreateFlag.OVERWRITE);
}
}
final DFSClient dfsClient = newDfsClient(nnId, confForCreate);
OutputStream out = dfsClient.createWrappedOutputStream(dfsClient.create(
path, permission, flags, replication,
blockSize, null, bufferSize, null), null);
path, permission, flags, createParent, replication, blockSize, null,
bufferSize, null), null);
DefaultHttpResponse resp = new DefaultHttpResponse(HTTP_1_1, CREATED);
final URI uri = new URI(HDFS_URI_SCHEME, nnId, path, null, null);

View File

@ -356,13 +356,15 @@ public Response putRoot(
@QueryParam(OldSnapshotNameParam.NAME) @DefaultValue(OldSnapshotNameParam.DEFAULT)
final OldSnapshotNameParam oldSnapshotName,
@QueryParam(ExcludeDatanodesParam.NAME) @DefaultValue(ExcludeDatanodesParam.DEFAULT)
final ExcludeDatanodesParam excludeDatanodes
final ExcludeDatanodesParam excludeDatanodes,
@QueryParam(CreateFlagParam.NAME) @DefaultValue(CreateFlagParam.DEFAULT)
final CreateFlagParam createFlagParam
) throws IOException, InterruptedException {
return put(ugi, delegation, username, doAsUser, ROOT, op, destination,
owner, group, permission, overwrite, bufferSize, replication,
blockSize, modificationTime, accessTime, renameOptions, createParent,
delegationTokenArgument, aclPermission, xattrName, xattrValue,
xattrSetFlag, snapshotName, oldSnapshotName, excludeDatanodes);
xattrSetFlag, snapshotName, oldSnapshotName, excludeDatanodes, createFlagParam);
}
/** Handle HTTP PUT request. */
@ -420,14 +422,16 @@ public Response put(
@QueryParam(OldSnapshotNameParam.NAME) @DefaultValue(OldSnapshotNameParam.DEFAULT)
final OldSnapshotNameParam oldSnapshotName,
@QueryParam(ExcludeDatanodesParam.NAME) @DefaultValue(ExcludeDatanodesParam.DEFAULT)
final ExcludeDatanodesParam excludeDatanodes
final ExcludeDatanodesParam excludeDatanodes,
@QueryParam(CreateFlagParam.NAME) @DefaultValue(CreateFlagParam.DEFAULT)
final CreateFlagParam createFlagParam
) throws IOException, InterruptedException {
init(ugi, delegation, username, doAsUser, path, op, destination, owner,
group, permission, overwrite, bufferSize, replication, blockSize,
modificationTime, accessTime, renameOptions, delegationTokenArgument,
aclPermission, xattrName, xattrValue, xattrSetFlag, snapshotName,
oldSnapshotName, excludeDatanodes);
oldSnapshotName, excludeDatanodes, createFlagParam);
return ugi.doAs(new PrivilegedExceptionAction<Response>() {
@Override
@ -438,7 +442,8 @@ public Response run() throws IOException, URISyntaxException {
permission, overwrite, bufferSize, replication, blockSize,
modificationTime, accessTime, renameOptions, createParent,
delegationTokenArgument, aclPermission, xattrName, xattrValue,
xattrSetFlag, snapshotName, oldSnapshotName, excludeDatanodes);
xattrSetFlag, snapshotName, oldSnapshotName, excludeDatanodes,
createFlagParam);
} finally {
reset();
}
@ -472,7 +477,8 @@ private Response put(
final XAttrSetFlagParam xattrSetFlag,
final SnapshotNameParam snapshotName,
final OldSnapshotNameParam oldSnapshotName,
final ExcludeDatanodesParam exclDatanodes
final ExcludeDatanodesParam exclDatanodes,
final CreateFlagParam createFlagParam
) throws IOException, URISyntaxException {
final Configuration conf = (Configuration)context.getAttribute(JspHelper.CURRENT_CONF);
@ -485,7 +491,7 @@ private Response put(
final URI uri = redirectURI(namenode, ugi, delegation, username,
doAsUser, fullpath, op.getValue(), -1L, blockSize.getValue(conf),
exclDatanodes.getValue(), permission, overwrite, bufferSize,
replication, blockSize);
replication, blockSize, createParent, createFlagParam);
return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
}
case MKDIRS:

View File

@ -97,6 +97,7 @@ WebHDFS REST API
* [Access Time](#Access_Time)
* [Block Size](#Block_Size)
* [Buffer Size](#Buffer_Size)
* [Create Flag](#Create_Flag)
* [Create Parent](#Create_Parent)
* [Delegation](#Delegation)
* [Destination](#Destination)
@ -1632,14 +1633,30 @@ See also: [`CREATE`](#Create_and_Write_to_a_File)
See also: [`CREATE`](#Create_and_Write_to_a_File), [`APPEND`](#Append_to_a_File), [`OPEN`](#Open_and_Read_a_File)
### Create Flag
| Name | `createflag` |
|:---- |:---- |
| Description | Enum of possible flags to process while creating a file |
| Type | enumerated strings |
| Default Value | \<empty\> |
| Valid Values | Legal combinations of create, overwrite, append and sync_block |
| Syntax | See note below |
The following combinations are not valid:
* append,create
* create,append,overwrite
See also: [`CREATE`](#Create_and_Write_to_a_File)
### Create Parent
| Name | `createparent` |
|:---- |:---- |
| Description | If the parent directories do not exist, should they be created? |
| Type | boolean |
| Default Value | false |
| Valid Values | true |
| Default Value | true |
| Valid Values | true, false |
| Syntax | true |
See also: [`CREATESYMLINK`](#Create_a_Symbolic_Link)

View File

@ -33,6 +33,7 @@
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assume.assumeTrue;
@ -794,93 +795,94 @@ public void testFileCreationNonRecursive() throws IOException {
}
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
FileSystem fs = cluster.getFileSystem();
final Path path = new Path("/" + Time.now()
+ "-testFileCreationNonRecursive");
FSDataOutputStream out = null;
try {
IOException expectedException = null;
final String nonExistDir = "/non-exist-" + Time.now();
fs.delete(new Path(nonExistDir), true);
EnumSet<CreateFlag> createFlag = EnumSet.of(CreateFlag.CREATE);
// Create a new file in root dir, should succeed
out = createNonRecursive(fs, path, 1, createFlag);
out.close();
// Create a file when parent dir exists as file, should fail
try {
createNonRecursive(fs, new Path(path, "Create"), 1, createFlag);
} catch (IOException e) {
expectedException = e;
}
assertTrue("Create a file when parent directory exists as a file"
+ " should throw ParentNotDirectoryException ",
expectedException != null
&& expectedException instanceof ParentNotDirectoryException);
fs.delete(path, true);
// Create a file in a non-exist directory, should fail
final Path path2 = new Path(nonExistDir + "/testCreateNonRecursive");
expectedException = null;
try {
createNonRecursive(fs, path2, 1, createFlag);
} catch (IOException e) {
expectedException = e;
}
assertTrue("Create a file in a non-exist dir using"
+ " createNonRecursive() should throw FileNotFoundException ",
expectedException != null
&& expectedException instanceof FileNotFoundException);
EnumSet<CreateFlag> overwriteFlag =
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE);
// Overwrite a file in root dir, should succeed
out = createNonRecursive(fs, path, 1, overwriteFlag);
out.close();
// Overwrite a file when parent dir exists as file, should fail
expectedException = null;
try {
createNonRecursive(fs, new Path(path, "Overwrite"), 1, overwriteFlag);
} catch (IOException e) {
expectedException = e;
}
assertTrue("Overwrite a file when parent directory exists as a file"
+ " should throw ParentNotDirectoryException ",
expectedException != null
&& expectedException instanceof ParentNotDirectoryException);
fs.delete(path, true);
// Overwrite a file in a non-exist directory, should fail
final Path path3 = new Path(nonExistDir + "/testOverwriteNonRecursive");
expectedException = null;
try {
createNonRecursive(fs, path3, 1, overwriteFlag);
} catch (IOException e) {
expectedException = e;
}
assertTrue("Overwrite a file in a non-exist dir using"
+ " createNonRecursive() should throw FileNotFoundException ",
expectedException != null
&& expectedException instanceof FileNotFoundException);
testFileCreationNonRecursive(fs);
} finally {
fs.close();
cluster.shutdown();
}
}
// creates a file using DistributedFileSystem.createNonRecursive()
static FSDataOutputStream createNonRecursive(FileSystem fs, Path name,
int repl, EnumSet<CreateFlag> flag) throws IOException {
System.out.println("createNonRecursive: Created " + name + " with " + repl
+ " replica.");
FSDataOutputStream stm = ((DistributedFileSystem) fs).createNonRecursive(
name, FsPermission.getDefault(), flag, fs.getConf().getInt(
CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096), (short) repl, blockSize, null);
return stm;
// Worker method for testing non-recursive. Extracted to allow other
// FileSystem implementations to re-use the tests
public static void testFileCreationNonRecursive(FileSystem fs) throws IOException {
final Path path = new Path("/" + Time.now()
+ "-testFileCreationNonRecursive");
FSDataOutputStream out = null;
IOException expectedException = null;
final String nonExistDir = "/non-exist-" + Time.now();
fs.delete(new Path(nonExistDir), true);
EnumSet<CreateFlag> createFlag = EnumSet.of(CreateFlag.CREATE);
// Create a new file in root dir, should succeed
assertNull(createNonRecursive(fs, path, 1, createFlag));
// Create a file when parent dir exists as file, should fail
expectedException = createNonRecursive(fs, new Path(path, "Create"), 1, createFlag);
assertTrue("Create a file when parent directory exists as a file"
+ " should throw ParentNotDirectoryException ",
expectedException != null
&& expectedException instanceof ParentNotDirectoryException);
fs.delete(path, true);
// Create a file in a non-exist directory, should fail
final Path path2 = new Path(nonExistDir + "/testCreateNonRecursive");
expectedException = createNonRecursive(fs, path2, 1, createFlag);
assertTrue("Create a file in a non-exist dir using"
+ " createNonRecursive() should throw FileNotFoundException ",
expectedException != null
&& expectedException instanceof FileNotFoundException);
EnumSet<CreateFlag> overwriteFlag =
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE);
// Overwrite a file in root dir, should succeed
assertNull(createNonRecursive(fs, path, 1, overwriteFlag));
// Overwrite a file when parent dir exists as file, should fail
expectedException = createNonRecursive(fs, new Path(path, "Overwrite"), 1, overwriteFlag);
assertTrue("Overwrite a file when parent directory exists as a file"
+ " should throw ParentNotDirectoryException ",
expectedException != null
&& expectedException instanceof ParentNotDirectoryException);
fs.delete(path, true);
// Overwrite a file in a non-exist directory, should fail
final Path path3 = new Path(nonExistDir + "/testOverwriteNonRecursive");
expectedException = createNonRecursive(fs, path3, 1, overwriteFlag);
assertTrue("Overwrite a file in a non-exist dir using"
+ " createNonRecursive() should throw FileNotFoundException ",
expectedException != null
&& expectedException instanceof FileNotFoundException);
}
// Attempts to create and close a file using FileSystem.createNonRecursive(),
// catching and returning an exception if one occurs or null
// if the operation is successful.
@SuppressWarnings("deprecation")
static IOException createNonRecursive(FileSystem fs, Path name,
int repl, EnumSet<CreateFlag> flag) throws IOException {
try {
System.out.println("createNonRecursive: Attempting to create " + name +
" with " + repl + " replica.");
int bufferSize = fs.getConf()
.getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096);
FSDataOutputStream stm = fs.createNonRecursive(name,
FsPermission.getDefault(), flag, bufferSize, (short) repl, blockSize,
null);
stm.close();
} catch (IOException e) {
return e;
}
return null;
}
/**
* Test that file data becomes available before file is closed.
*/
/**
* Test that file data becomes available before file is closed.
*/
@Test
public void testFileCreationSimulated() throws IOException {
simulatedStorage = true;

View File

@ -50,6 +50,7 @@
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.TestDFSClientRetries;
import org.apache.hadoop.hdfs.TestFileCreation;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
@ -417,6 +418,30 @@ public void testWebHdfsDeleteSnapshot() throws Exception {
}
}
@Test
public void testWebHdfsCreateNonRecursive() throws IOException, URISyntaxException {
MiniDFSCluster cluster = null;
final Configuration conf = WebHdfsTestUtil.createConf();
WebHdfsFileSystem webHdfs = null;
try {
cluster = new MiniDFSCluster.Builder(conf).build();
cluster.waitActive();
webHdfs = WebHdfsTestUtil.getWebHdfsFileSystem(conf, WebHdfsConstants.WEBHDFS_SCHEME);
TestFileCreation.testFileCreationNonRecursive(webHdfs);
} finally {
if(webHdfs != null) {
webHdfs.close();
}
if(cluster != null) {
cluster.shutdown();
}
}
}
/**
* Test snapshot rename through WebHdfs
*/