HDFS-7766. Add a flag to WebHDFS op=CREATE to not respond with a 307 redirect (Ravi Prakash via aw)
This commit is contained in:
parent
6d043aa4cf
commit
4b0f55b6ea
@ -0,0 +1,49 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.web.resources;
|
||||
|
||||
/** Overwrite parameter. */
|
||||
public class NoRedirectParam extends BooleanParam {
|
||||
/** Parameter name. */
|
||||
public static final String NAME = "noredirect";
|
||||
/** Default parameter value. */
|
||||
public static final String DEFAULT = FALSE;
|
||||
|
||||
private static final Domain DOMAIN = new Domain(NAME);
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
* @param value the parameter value.
|
||||
*/
|
||||
public NoRedirectParam(final Boolean value) {
|
||||
super(DOMAIN, value);
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
* @param str a string representation of the parameter value.
|
||||
*/
|
||||
public NoRedirectParam(final String str) {
|
||||
this(DOMAIN.parse(str));
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return NAME;
|
||||
}
|
||||
}
|
@ -33,6 +33,7 @@
|
||||
import org.apache.hadoop.hdfs.web.resources.HttpOpParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.LengthParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.NamenodeAddressParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.NoRedirectParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.OffsetParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.OverwriteParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.PermissionParam;
|
||||
@ -110,6 +111,10 @@ boolean overwrite() {
|
||||
return new OverwriteParam(param(OverwriteParam.NAME)).getValue();
|
||||
}
|
||||
|
||||
boolean noredirect() {
|
||||
return new NoRedirectParam(param(NoRedirectParam.NAME)).getValue();
|
||||
}
|
||||
|
||||
Token<DelegationTokenIdentifier> delegationToken() throws IOException {
|
||||
String delegation = param(DelegationParam.NAME);
|
||||
final Token<DelegationTokenIdentifier> token = new
|
||||
|
@ -24,7 +24,12 @@
|
||||
import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE;
|
||||
import static io.netty.handler.codec.http.HttpHeaderNames.LOCATION;
|
||||
import static io.netty.handler.codec.http.HttpHeaderValues.CLOSE;
|
||||
import static io.netty.handler.codec.http.HttpHeaderNames.ACCEPT;
|
||||
import static io.netty.handler.codec.http.HttpHeaderNames.ACCESS_CONTROL_ALLOW_HEADERS;
|
||||
import static io.netty.handler.codec.http.HttpHeaderNames.ACCESS_CONTROL_MAX_AGE;
|
||||
import static io.netty.handler.codec.http.HttpHeaderValues.KEEP_ALIVE;
|
||||
import static io.netty.handler.codec.http.HttpMethod.GET;
|
||||
import static io.netty.handler.codec.http.HttpMethod.OPTIONS;
|
||||
import static io.netty.handler.codec.http.HttpMethod.POST;
|
||||
import static io.netty.handler.codec.http.HttpMethod.PUT;
|
||||
import static io.netty.handler.codec.http.HttpResponseStatus.CONTINUE;
|
||||
@ -59,7 +64,6 @@
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CreateFlag;
|
||||
import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
|
||||
import org.apache.hadoop.fs.ParentNotDirectoryException;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hdfs.DFSClient;
|
||||
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
|
||||
@ -137,6 +141,9 @@ public void handle(ChannelHandlerContext ctx, HttpRequest req)
|
||||
} else if(GetOpParam.Op.GETFILECHECKSUM.name().equalsIgnoreCase(op)
|
||||
&& method == GET) {
|
||||
onGetFileChecksum(ctx);
|
||||
} else if(PutOpParam.Op.CREATE.name().equalsIgnoreCase(op)
|
||||
&& method == OPTIONS) {
|
||||
allowCORSOnCreate(ctx);
|
||||
} else {
|
||||
throw new IllegalArgumentException("Invalid operation " + op);
|
||||
}
|
||||
@ -182,6 +189,8 @@ private void onCreate(ChannelHandlerContext ctx)
|
||||
final URI uri = new URI(HDFS_URI_SCHEME, nnId, path, null, null);
|
||||
resp.headers().set(LOCATION, uri.toString());
|
||||
resp.headers().set(CONTENT_LENGTH, 0);
|
||||
resp.headers().set(ACCESS_CONTROL_ALLOW_ORIGIN, "*");
|
||||
|
||||
ctx.pipeline().replace(this, HdfsWriter.class.getSimpleName(),
|
||||
new HdfsWriter(dfsClient, out, resp));
|
||||
}
|
||||
@ -262,6 +271,21 @@ private void onGetFileChecksum(ChannelHandlerContext ctx) throws IOException {
|
||||
ctx.writeAndFlush(resp).addListener(ChannelFutureListener.CLOSE);
|
||||
}
|
||||
|
||||
//Accept preflighted CORS requests
|
||||
private void allowCORSOnCreate(ChannelHandlerContext ctx)
|
||||
throws IOException, URISyntaxException {
|
||||
DefaultHttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
|
||||
HttpHeaders headers = response.headers();
|
||||
headers.set(ACCESS_CONTROL_ALLOW_ORIGIN, "*");
|
||||
headers.set(ACCESS_CONTROL_ALLOW_HEADERS, ACCEPT);
|
||||
headers.set(ACCESS_CONTROL_ALLOW_METHODS, PUT);
|
||||
headers.set(ACCESS_CONTROL_MAX_AGE, 1728000);
|
||||
headers.set(CONTENT_LENGTH, 0);
|
||||
headers.set(CONNECTION, KEEP_ALIVE);
|
||||
|
||||
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
|
||||
}
|
||||
|
||||
private static void writeContinueHeader(ChannelHandlerContext ctx) {
|
||||
DefaultHttpResponse r = new DefaultFullHttpResponse(HTTP_1_1, CONTINUE,
|
||||
Unpooled.EMPTY_BUFFER);
|
||||
|
@ -357,13 +357,16 @@ public Response putRoot(
|
||||
@QueryParam(ExcludeDatanodesParam.NAME) @DefaultValue(ExcludeDatanodesParam.DEFAULT)
|
||||
final ExcludeDatanodesParam excludeDatanodes,
|
||||
@QueryParam(CreateFlagParam.NAME) @DefaultValue(CreateFlagParam.DEFAULT)
|
||||
final CreateFlagParam createFlagParam
|
||||
final CreateFlagParam createFlagParam,
|
||||
@QueryParam(NoRedirectParam.NAME) @DefaultValue(NoRedirectParam.DEFAULT)
|
||||
final NoRedirectParam noredirect
|
||||
) throws IOException, InterruptedException {
|
||||
return put(ugi, delegation, username, doAsUser, ROOT, op, destination,
|
||||
owner, group, permission, overwrite, bufferSize, replication,
|
||||
blockSize, modificationTime, accessTime, renameOptions, createParent,
|
||||
delegationTokenArgument, aclPermission, xattrName, xattrValue,
|
||||
xattrSetFlag, snapshotName, oldSnapshotName, excludeDatanodes, createFlagParam);
|
||||
xattrSetFlag, snapshotName, oldSnapshotName, excludeDatanodes,
|
||||
createFlagParam, noredirect);
|
||||
}
|
||||
|
||||
/** Handle HTTP PUT request. */
|
||||
@ -423,14 +426,16 @@ public Response put(
|
||||
@QueryParam(ExcludeDatanodesParam.NAME) @DefaultValue(ExcludeDatanodesParam.DEFAULT)
|
||||
final ExcludeDatanodesParam excludeDatanodes,
|
||||
@QueryParam(CreateFlagParam.NAME) @DefaultValue(CreateFlagParam.DEFAULT)
|
||||
final CreateFlagParam createFlagParam
|
||||
final CreateFlagParam createFlagParam,
|
||||
@QueryParam(NoRedirectParam.NAME) @DefaultValue(NoRedirectParam.DEFAULT)
|
||||
final NoRedirectParam noredirect
|
||||
) throws IOException, InterruptedException {
|
||||
|
||||
init(ugi, delegation, username, doAsUser, path, op, destination, owner,
|
||||
group, permission, overwrite, bufferSize, replication, blockSize,
|
||||
modificationTime, accessTime, renameOptions, delegationTokenArgument,
|
||||
aclPermission, xattrName, xattrValue, xattrSetFlag, snapshotName,
|
||||
oldSnapshotName, excludeDatanodes, createFlagParam);
|
||||
oldSnapshotName, excludeDatanodes, createFlagParam, noredirect);
|
||||
|
||||
return ugi.doAs(new PrivilegedExceptionAction<Response>() {
|
||||
@Override
|
||||
@ -442,7 +447,7 @@ public Response run() throws IOException, URISyntaxException {
|
||||
modificationTime, accessTime, renameOptions, createParent,
|
||||
delegationTokenArgument, aclPermission, xattrName, xattrValue,
|
||||
xattrSetFlag, snapshotName, oldSnapshotName, excludeDatanodes,
|
||||
createFlagParam);
|
||||
createFlagParam, noredirect);
|
||||
} finally {
|
||||
reset();
|
||||
}
|
||||
@ -477,7 +482,8 @@ private Response put(
|
||||
final SnapshotNameParam snapshotName,
|
||||
final OldSnapshotNameParam oldSnapshotName,
|
||||
final ExcludeDatanodesParam exclDatanodes,
|
||||
final CreateFlagParam createFlagParam
|
||||
final CreateFlagParam createFlagParam,
|
||||
final NoRedirectParam noredirectParam
|
||||
) throws IOException, URISyntaxException {
|
||||
|
||||
final Configuration conf = (Configuration)context.getAttribute(JspHelper.CURRENT_CONF);
|
||||
@ -491,8 +497,14 @@ private Response put(
|
||||
doAsUser, fullpath, op.getValue(), -1L, blockSize.getValue(conf),
|
||||
exclDatanodes.getValue(), permission, overwrite, bufferSize,
|
||||
replication, blockSize, createParent, createFlagParam);
|
||||
return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
|
||||
}
|
||||
if(!noredirectParam.getValue()) {
|
||||
return Response.temporaryRedirect(uri)
|
||||
.type(MediaType.APPLICATION_OCTET_STREAM).build();
|
||||
} else {
|
||||
final String js = JsonUtil.toJsonString("Location", uri);
|
||||
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
|
||||
}
|
||||
}
|
||||
case MKDIRS:
|
||||
{
|
||||
final boolean b = np.mkdirs(fullpath, permission.getFsPermission(), true);
|
||||
@ -635,10 +647,12 @@ public Response postRoot(
|
||||
@QueryParam(ExcludeDatanodesParam.NAME) @DefaultValue(ExcludeDatanodesParam.DEFAULT)
|
||||
final ExcludeDatanodesParam excludeDatanodes,
|
||||
@QueryParam(NewLengthParam.NAME) @DefaultValue(NewLengthParam.DEFAULT)
|
||||
final NewLengthParam newLength
|
||||
final NewLengthParam newLength,
|
||||
@QueryParam(NoRedirectParam.NAME) @DefaultValue(NoRedirectParam.DEFAULT)
|
||||
final NoRedirectParam noredirect
|
||||
) throws IOException, InterruptedException {
|
||||
return post(ugi, delegation, username, doAsUser, ROOT, op, concatSrcs,
|
||||
bufferSize, excludeDatanodes, newLength);
|
||||
bufferSize, excludeDatanodes, newLength, noredirect);
|
||||
}
|
||||
|
||||
/** Handle HTTP POST request. */
|
||||
@ -664,7 +678,9 @@ public Response post(
|
||||
@QueryParam(ExcludeDatanodesParam.NAME) @DefaultValue(ExcludeDatanodesParam.DEFAULT)
|
||||
final ExcludeDatanodesParam excludeDatanodes,
|
||||
@QueryParam(NewLengthParam.NAME) @DefaultValue(NewLengthParam.DEFAULT)
|
||||
final NewLengthParam newLength
|
||||
final NewLengthParam newLength,
|
||||
@QueryParam(NoRedirectParam.NAME) @DefaultValue(NoRedirectParam.DEFAULT)
|
||||
final NoRedirectParam noredirect
|
||||
) throws IOException, InterruptedException {
|
||||
|
||||
init(ugi, delegation, username, doAsUser, path, op, concatSrcs, bufferSize,
|
||||
@ -676,7 +692,7 @@ public Response run() throws IOException, URISyntaxException {
|
||||
try {
|
||||
return post(ugi, delegation, username, doAsUser,
|
||||
path.getAbsolutePath(), op, concatSrcs, bufferSize,
|
||||
excludeDatanodes, newLength);
|
||||
excludeDatanodes, newLength, noredirect);
|
||||
} finally {
|
||||
reset();
|
||||
}
|
||||
@ -694,7 +710,8 @@ private Response post(
|
||||
final ConcatSourcesParam concatSrcs,
|
||||
final BufferSizeParam bufferSize,
|
||||
final ExcludeDatanodesParam excludeDatanodes,
|
||||
final NewLengthParam newLength
|
||||
final NewLengthParam newLength,
|
||||
final NoRedirectParam noredirectParam
|
||||
) throws IOException, URISyntaxException {
|
||||
final NameNode namenode = (NameNode)context.getAttribute("name.node");
|
||||
final NamenodeProtocols np = getRPCServer(namenode);
|
||||
@ -705,7 +722,13 @@ private Response post(
|
||||
final URI uri = redirectURI(namenode, ugi, delegation, username,
|
||||
doAsUser, fullpath, op.getValue(), -1L, -1L,
|
||||
excludeDatanodes.getValue(), bufferSize);
|
||||
return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
|
||||
if(!noredirectParam.getValue()) {
|
||||
return Response.temporaryRedirect(uri)
|
||||
.type(MediaType.APPLICATION_OCTET_STREAM).build();
|
||||
} else {
|
||||
final String js = JsonUtil.toJsonString("Location", uri);
|
||||
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
|
||||
}
|
||||
}
|
||||
case CONCAT:
|
||||
{
|
||||
@ -762,11 +785,13 @@ public Response getRoot(
|
||||
@QueryParam(TokenKindParam.NAME) @DefaultValue(TokenKindParam.DEFAULT)
|
||||
final TokenKindParam tokenKind,
|
||||
@QueryParam(TokenServiceParam.NAME) @DefaultValue(TokenServiceParam.DEFAULT)
|
||||
final TokenServiceParam tokenService
|
||||
final TokenServiceParam tokenService,
|
||||
@QueryParam(NoRedirectParam.NAME) @DefaultValue(NoRedirectParam.DEFAULT)
|
||||
final NoRedirectParam noredirect
|
||||
) throws IOException, InterruptedException {
|
||||
return get(ugi, delegation, username, doAsUser, ROOT, op, offset, length,
|
||||
renewer, bufferSize, xattrNames, xattrEncoding, excludeDatanodes, fsAction,
|
||||
tokenKind, tokenService);
|
||||
tokenKind, tokenService, noredirect);
|
||||
}
|
||||
|
||||
/** Handle HTTP GET request. */
|
||||
@ -803,7 +828,9 @@ public Response get(
|
||||
@QueryParam(TokenKindParam.NAME) @DefaultValue(TokenKindParam.DEFAULT)
|
||||
final TokenKindParam tokenKind,
|
||||
@QueryParam(TokenServiceParam.NAME) @DefaultValue(TokenServiceParam.DEFAULT)
|
||||
final TokenServiceParam tokenService
|
||||
final TokenServiceParam tokenService,
|
||||
@QueryParam(NoRedirectParam.NAME) @DefaultValue(NoRedirectParam.DEFAULT)
|
||||
final NoRedirectParam noredirect
|
||||
) throws IOException, InterruptedException {
|
||||
|
||||
init(ugi, delegation, username, doAsUser, path, op, offset, length,
|
||||
@ -817,7 +844,7 @@ public Response run() throws IOException, URISyntaxException {
|
||||
return get(ugi, delegation, username, doAsUser,
|
||||
path.getAbsolutePath(), op, offset, length, renewer, bufferSize,
|
||||
xattrNames, xattrEncoding, excludeDatanodes, fsAction, tokenKind,
|
||||
tokenService);
|
||||
tokenService, noredirect);
|
||||
} finally {
|
||||
reset();
|
||||
}
|
||||
@ -841,7 +868,8 @@ private Response get(
|
||||
final ExcludeDatanodesParam excludeDatanodes,
|
||||
final FsActionParam fsAction,
|
||||
final TokenKindParam tokenKind,
|
||||
final TokenServiceParam tokenService
|
||||
final TokenServiceParam tokenService,
|
||||
final NoRedirectParam noredirectParam
|
||||
) throws IOException, URISyntaxException {
|
||||
final NameNode namenode = (NameNode)context.getAttribute("name.node");
|
||||
final Configuration conf = (Configuration) context
|
||||
@ -854,7 +882,13 @@ private Response get(
|
||||
final URI uri = redirectURI(namenode, ugi, delegation, username,
|
||||
doAsUser, fullpath, op.getValue(), offset.getValue(), -1L,
|
||||
excludeDatanodes.getValue(), offset, length, bufferSize);
|
||||
return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
|
||||
if(!noredirectParam.getValue()) {
|
||||
return Response.temporaryRedirect(uri)
|
||||
.type(MediaType.APPLICATION_OCTET_STREAM).build();
|
||||
} else {
|
||||
final String js = JsonUtil.toJsonString("Location", uri);
|
||||
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
|
||||
}
|
||||
}
|
||||
case GET_BLOCK_LOCATIONS:
|
||||
{
|
||||
@ -890,7 +924,13 @@ private Response get(
|
||||
{
|
||||
final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser,
|
||||
fullpath, op.getValue(), -1L, -1L, null);
|
||||
return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
|
||||
if(!noredirectParam.getValue()) {
|
||||
return Response.temporaryRedirect(uri)
|
||||
.type(MediaType.APPLICATION_OCTET_STREAM).build();
|
||||
} else {
|
||||
final String js = JsonUtil.toJsonString("Location", uri);
|
||||
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
|
||||
}
|
||||
}
|
||||
case GETDELEGATIONTOKEN:
|
||||
{
|
||||
|
@ -121,6 +121,7 @@ WebHDFS REST API
|
||||
* [Token Kind](#Token_Kind)
|
||||
* [Token Service](#Token_Service)
|
||||
* [Username](#Username)
|
||||
* [NoRedirect](#NoRedirect)
|
||||
|
||||
Document Conventions
|
||||
--------------------
|
||||
@ -325,15 +326,21 @@ File and Directory Operations
|
||||
|
||||
curl -i -X PUT "http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=CREATE
|
||||
[&overwrite=<true |false>][&blocksize=<LONG>][&replication=<SHORT>]
|
||||
[&permission=<OCTAL>][&buffersize=<INT>]"
|
||||
[&permission=<OCTAL>][&buffersize=<INT>][&noredirect=<true|false>]"
|
||||
|
||||
The request is redirected to a datanode where the file data is to be written:
|
||||
Usually the request is redirected to a datanode where the file data is to be written.
|
||||
|
||||
HTTP/1.1 307 TEMPORARY_REDIRECT
|
||||
Location: http://<DATANODE>:<PORT>/webhdfs/v1/<PATH>?op=CREATE...
|
||||
Content-Length: 0
|
||||
|
||||
* Step 2: Submit another HTTP PUT request using the URL in the `Location` header with the file data to be written.
|
||||
However, if you do not want to be automatically redirected, you can set the noredirect flag.
|
||||
|
||||
HTTP/1.1 200 OK
|
||||
Content-Type: application/json
|
||||
{"Location":"http://<DATANODE>:<PORT>/webhdfs/v1/<PATH>?op=CREATE..."}
|
||||
|
||||
* Step 2: Submit another HTTP PUT request using the URL in the `Location` header (or the returned response in case you specified noredirect) with the file data to be written.
|
||||
|
||||
curl -i -X PUT -T <LOCAL_FILE> "http://<DATANODE>:<PORT>/webhdfs/v1/<PATH>?op=CREATE..."
|
||||
|
||||
@ -351,15 +358,22 @@ See also: [`overwrite`](#Overwrite), [`blocksize`](#Block_Size), [`replication`]
|
||||
|
||||
* Step 1: Submit a HTTP POST request without automatically following redirects and without sending the file data.
|
||||
|
||||
curl -i -X POST "http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=APPEND[&buffersize=<INT>]"
|
||||
curl -i -X POST "http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=APPEND[&buffersize=<INT>][&noredirect=<true|false>]"
|
||||
|
||||
The request is redirected to a datanode where the file data is to be appended:
|
||||
Usually the request is redirected to a datanode where the file data is to be appended:
|
||||
|
||||
HTTP/1.1 307 TEMPORARY_REDIRECT
|
||||
Location: http://<DATANODE>:<PORT>/webhdfs/v1/<PATH>?op=APPEND...
|
||||
Content-Length: 0
|
||||
|
||||
* Step 2: Submit another HTTP POST request using the URL in the `Location` header with the file data to be appended.
|
||||
However, if you do not want to be automatically redirected, you can set the noredirect flag.
|
||||
|
||||
HTTP/1.1 200 OK
|
||||
Content-Type: application/json
|
||||
{"Location":"http://<DATANODE>:<PORT>/webhdfs/v1/<PATH>?op=APPEND..."}
|
||||
|
||||
|
||||
* Step 2: Submit another HTTP POST request using the URL in the `Location` header (or the returned response in case you specified noredirect) with the file data to be appended.
|
||||
|
||||
curl -i -X POST -T <LOCAL_FILE> "http://<DATANODE>:<PORT>/webhdfs/v1/<PATH>?op=APPEND..."
|
||||
|
||||
@ -390,14 +404,20 @@ See also: [`sources`](#Sources), [FileSystem](../../api/org/apache/hadoop/fs/Fil
|
||||
* Submit a HTTP GET request with automatically following redirects.
|
||||
|
||||
curl -i -L "http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=OPEN
|
||||
[&offset=<LONG>][&length=<LONG>][&buffersize=<INT>]"
|
||||
[&offset=<LONG>][&length=<LONG>][&buffersize=<INT>][&noredirect=<true|false>]"
|
||||
|
||||
The request is redirected to a datanode where the file data can be read:
|
||||
Usually the request is redirected to a datanode where the file data can be read:
|
||||
|
||||
HTTP/1.1 307 TEMPORARY_REDIRECT
|
||||
Location: http://<DATANODE>:<PORT>/webhdfs/v1/<PATH>?op=OPEN...
|
||||
Content-Length: 0
|
||||
|
||||
However if you do not want to be automatically redirected, you can set the noredirect flag.
|
||||
|
||||
HTTP/1.1 200 OK
|
||||
Content-Type: application/json
|
||||
{"Location":"http://<DATANODE>:<PORT>/webhdfs/v1/<PATH>?op=OPEN..."}
|
||||
|
||||
The client follows the redirect to the datanode and receives the file data:
|
||||
|
||||
HTTP/1.1 200 OK
|
||||
@ -618,12 +638,19 @@ See also: [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).getConten
|
||||
|
||||
curl -i "http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=GETFILECHECKSUM"
|
||||
|
||||
The request is redirected to a datanode:
|
||||
Usually the request is redirected to a datanode:
|
||||
|
||||
HTTP/1.1 307 TEMPORARY_REDIRECT
|
||||
Location: http://<DATANODE>:<PORT>/webhdfs/v1/<PATH>?op=GETFILECHECKSUM...
|
||||
Content-Length: 0
|
||||
|
||||
However, if you do not want to be automatically redirected, you can set the noredirect flag.
|
||||
|
||||
HTTP/1.1 200 OK
|
||||
Content-Type: application/json
|
||||
{"Location":"http://<DATANODE>:<PORT>/webhdfs/v1/<PATH>?op=GETFILECHECKSUM..."}
|
||||
|
||||
|
||||
The client follows the redirect to the datanode and receives a [`FileChecksum` JSON object](#FileChecksum_JSON_Schema):
|
||||
|
||||
HTTP/1.1 200 OK
|
||||
@ -2043,3 +2070,15 @@ See also: [`GETDELEGATIONTOKEN`](#Get_Delegation_Token)
|
||||
| Syntax | Any string. |
|
||||
|
||||
See also: [Authentication](#Authentication)
|
||||
|
||||
### NoRedirect
|
||||
|
||||
| Name | `noredirect` |
|
||||
|:---- |:---- |
|
||||
| Description | Whether the response should return an HTTP 307 redirect or HTTP 200 OK. See [Create and Write to a File](#Create_and_Write_to_a_File). |
|
||||
| Type | boolean |
|
||||
| Default Value | false |
|
||||
| Valid Values | true |
|
||||
| Syntax | true |
|
||||
|
||||
See also: [Create and Write to a File](#Create_and_Write_to_a_File)
|
||||
|
@ -67,6 +67,7 @@
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
||||
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem.WebHdfsInputStream;
|
||||
import org.apache.hadoop.hdfs.web.resources.LengthParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.NoRedirectParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.OffsetParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.Param;
|
||||
import org.apache.hadoop.io.retry.RetryPolicy;
|
||||
@ -78,6 +79,8 @@
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.log4j.Level;
|
||||
import org.codehaus.jettison.json.JSONException;
|
||||
import org.codehaus.jettison.json.JSONObject;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.mockito.internal.util.reflection.Whitebox;
|
||||
@ -818,7 +821,7 @@ public void testWebHdfsReadRetries() throws Exception {
|
||||
final Configuration conf = WebHdfsTestUtil.createConf();
|
||||
final Path dir = new Path("/testWebHdfsReadRetries");
|
||||
|
||||
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY, true);
|
||||
conf.setBoolean(HdfsClientConfigKeys.Retry.POLICY_ENABLED_KEY, true);
|
||||
conf.setInt(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY, 1);
|
||||
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024*512);
|
||||
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
|
||||
@ -949,4 +952,83 @@ public RetryAction shouldRetry(Exception e, int retries, int failovers,
|
||||
webIn.close();
|
||||
in.close();
|
||||
}
|
||||
|
||||
private void checkResponseContainsLocation(URL url, String TYPE)
|
||||
throws JSONException, IOException {
|
||||
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
|
||||
conn.setRequestMethod(TYPE);
|
||||
conn.setInstanceFollowRedirects(false);
|
||||
String response = IOUtils.toString(conn.getInputStream());
|
||||
LOG.info("Response was : " + response);
|
||||
Assert.assertEquals(
|
||||
"Response wasn't " + HttpURLConnection.HTTP_OK,
|
||||
HttpURLConnection.HTTP_OK, conn.getResponseCode());
|
||||
|
||||
JSONObject responseJson = new JSONObject(response);
|
||||
Assert.assertTrue("Response didn't give us a location. " + response,
|
||||
responseJson.has("Location"));
|
||||
|
||||
//Test that the DN allows CORS on Create
|
||||
if(TYPE.equals("CREATE")) {
|
||||
URL dnLocation = new URL(responseJson.getString("Location"));
|
||||
HttpURLConnection dnConn = (HttpURLConnection) dnLocation.openConnection();
|
||||
dnConn.setRequestMethod("OPTIONS");
|
||||
Assert.assertEquals("Datanode url : " + dnLocation + " didn't allow "
|
||||
+ "CORS", HttpURLConnection.HTTP_OK, dnConn.getResponseCode());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
/**
|
||||
* Test that when "&noredirect=true" is added to operations CREATE, APPEND,
|
||||
* OPEN, and GETFILECHECKSUM the response (which is usually a 307 temporary
|
||||
* redirect) is a 200 with JSON that contains the redirected location
|
||||
*/
|
||||
public void testWebHdfsNoRedirect() throws Exception {
|
||||
MiniDFSCluster cluster = null;
|
||||
final Configuration conf = WebHdfsTestUtil.createConf();
|
||||
try {
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
|
||||
LOG.info("Started cluster");
|
||||
InetSocketAddress addr = cluster.getNameNode().getHttpAddress();
|
||||
|
||||
URL url = new URL("http", addr.getHostString(), addr.getPort(),
|
||||
WebHdfsFileSystem.PATH_PREFIX + "/testWebHdfsNoRedirectCreate" +
|
||||
"?op=CREATE" + Param.toSortedString("&", new NoRedirectParam(true)));
|
||||
LOG.info("Sending create request " + url);
|
||||
checkResponseContainsLocation(url, "PUT");
|
||||
|
||||
//Write a file that we can read
|
||||
final WebHdfsFileSystem fs = WebHdfsTestUtil.getWebHdfsFileSystem(
|
||||
conf, WebHdfsConstants.WEBHDFS_SCHEME);
|
||||
final String PATH = "/testWebHdfsNoRedirect";
|
||||
byte[] CONTENTS = new byte[1024];
|
||||
RANDOM.nextBytes(CONTENTS);
|
||||
try (OutputStream os = fs.create(new Path(PATH))) {
|
||||
os.write(CONTENTS);
|
||||
}
|
||||
url = new URL("http", addr.getHostString(), addr.getPort(),
|
||||
WebHdfsFileSystem.PATH_PREFIX + "/testWebHdfsNoRedirect" +
|
||||
"?op=OPEN" + Param.toSortedString("&", new NoRedirectParam(true)));
|
||||
LOG.info("Sending open request " + url);
|
||||
checkResponseContainsLocation(url, "GET");
|
||||
|
||||
url = new URL("http", addr.getHostString(), addr.getPort(),
|
||||
WebHdfsFileSystem.PATH_PREFIX + "/testWebHdfsNoRedirect" +
|
||||
"?op=GETFILECHECKSUM" + Param.toSortedString(
|
||||
"&", new NoRedirectParam(true)));
|
||||
LOG.info("Sending getfilechecksum request " + url);
|
||||
checkResponseContainsLocation(url, "GET");
|
||||
|
||||
url = new URL("http", addr.getHostString(), addr.getPort(),
|
||||
WebHdfsFileSystem.PATH_PREFIX + "/testWebHdfsNoRedirect" +
|
||||
"?op=APPEND" + Param.toSortedString("&", new NoRedirectParam(true)));
|
||||
LOG.info("Sending append request " + url);
|
||||
checkResponseContainsLocation(url, "POST");
|
||||
} finally {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user