Revert "HDFS-11156. Add new op GETFILEBLOCKLOCATIONS to WebHDFS REST API. Contributed by Weiwei Yang."
This reverts commit 7fcc73fc0d
.
Conflicts:
hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/WebHDFS.md
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFS.java
This commit is contained in:
parent
2f867115a8
commit
390c2b5df0
@ -22,7 +22,6 @@
|
|||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import com.google.common.collect.Maps;
|
import com.google.common.collect.Maps;
|
||||||
import org.apache.hadoop.fs.BlockLocation;
|
|
||||||
import org.apache.hadoop.fs.ContentSummary;
|
import org.apache.hadoop.fs.ContentSummary;
|
||||||
import org.apache.hadoop.fs.ContentSummary.Builder;
|
import org.apache.hadoop.fs.ContentSummary.Builder;
|
||||||
import org.apache.hadoop.fs.FileChecksum;
|
import org.apache.hadoop.fs.FileChecksum;
|
||||||
@ -645,56 +644,4 @@ private static StorageType[] toStorageTypes(List<?> list) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static BlockLocation[] toBlockLocationArray(Map<?, ?> json)
|
|
||||||
throws IOException{
|
|
||||||
final Map<?, ?> rootmap =
|
|
||||||
(Map<?, ?>)json.get(BlockLocation.class.getSimpleName() + "s");
|
|
||||||
final List<?> array = JsonUtilClient.getList(rootmap,
|
|
||||||
BlockLocation.class.getSimpleName());
|
|
||||||
|
|
||||||
Preconditions.checkNotNull(array);
|
|
||||||
final BlockLocation[] locations = new BlockLocation[array.size()];
|
|
||||||
int i = 0;
|
|
||||||
for (Object object : array) {
|
|
||||||
final Map<?, ?> m = (Map<?, ?>) object;
|
|
||||||
locations[i++] = JsonUtilClient.toBlockLocation(m);
|
|
||||||
}
|
|
||||||
return locations;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Convert a Json map to BlockLocation. **/
|
|
||||||
static BlockLocation toBlockLocation(Map<?, ?> m)
|
|
||||||
throws IOException{
|
|
||||||
if(m == null) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
long length = ((Number) m.get("length")).longValue();
|
|
||||||
long offset = ((Number) m.get("offset")).longValue();
|
|
||||||
boolean corrupt = Boolean.
|
|
||||||
getBoolean(m.get("corrupt").toString());
|
|
||||||
String[] storageIds = toStringArray(getList(m, "storageIds"));
|
|
||||||
String[] cachedHosts = toStringArray(getList(m, "cachedHosts"));
|
|
||||||
String[] hosts = toStringArray(getList(m, "hosts"));
|
|
||||||
String[] names = toStringArray(getList(m, "names"));
|
|
||||||
String[] topologyPaths = toStringArray(getList(m, "topologyPaths"));
|
|
||||||
StorageType[] storageTypes = toStorageTypeArray(
|
|
||||||
getList(m, "storageTypes"));
|
|
||||||
return new BlockLocation(names, hosts, cachedHosts,
|
|
||||||
topologyPaths, storageIds, storageTypes,
|
|
||||||
offset, length, corrupt);
|
|
||||||
}
|
|
||||||
|
|
||||||
static String[] toStringArray(List<?> list) {
|
|
||||||
if (list == null) {
|
|
||||||
return null;
|
|
||||||
} else {
|
|
||||||
final String[] array = new String[list.size()];
|
|
||||||
int i = 0;
|
|
||||||
for (Object object : list) {
|
|
||||||
array[i++] = object.toString();
|
|
||||||
}
|
|
||||||
return array;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -1616,68 +1616,14 @@ public BlockLocation[] getFileBlockLocations(final Path p,
|
|||||||
final long offset, final long length) throws IOException {
|
final long offset, final long length) throws IOException {
|
||||||
statistics.incrementReadOps(1);
|
statistics.incrementReadOps(1);
|
||||||
storageStatistics.incrementOpCounter(OpType.GET_FILE_BLOCK_LOCATIONS);
|
storageStatistics.incrementOpCounter(OpType.GET_FILE_BLOCK_LOCATIONS);
|
||||||
BlockLocation[] locations = null;
|
|
||||||
try {
|
|
||||||
locations = getFileBlockLocations(
|
|
||||||
GetOpParam.Op.GETFILEBLOCKLOCATIONS,
|
|
||||||
p, offset, length);
|
|
||||||
} catch (RemoteException e) {
|
|
||||||
// See the error message from ExceptionHandle
|
|
||||||
if(e.getMessage() != null &&
|
|
||||||
e.getMessage().contains(
|
|
||||||
"Invalid value for webhdfs parameter") &&
|
|
||||||
e.getMessage().contains(
|
|
||||||
GetOpParam.Op.GETFILEBLOCKLOCATIONS.toString())) {
|
|
||||||
// Old webhdfs server doesn't support GETFILEBLOCKLOCATIONS
|
|
||||||
// operation, fall back to query again using old API
|
|
||||||
// GET_BLOCK_LOCATIONS.
|
|
||||||
LOG.info("Invalid webhdfs operation parameter "
|
|
||||||
+ GetOpParam.Op.GETFILEBLOCKLOCATIONS + ". Fallback to use "
|
|
||||||
+ GetOpParam.Op.GET_BLOCK_LOCATIONS + " instead.");
|
|
||||||
locations = getFileBlockLocations(
|
|
||||||
GetOpParam.Op.GET_BLOCK_LOCATIONS,
|
|
||||||
p, offset, length);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return locations;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
final HttpOpParam.Op op = GetOpParam.Op.GET_BLOCK_LOCATIONS;
|
||||||
* Get file block locations implementation. Provide a operation
|
return new FsPathResponseRunner<BlockLocation[]>(op, p,
|
||||||
* parameter to determine how to get block locations from a webhdfs
|
|
||||||
* server. Older server only supports <b>GET_BLOCK_LOCATIONS</b> but
|
|
||||||
* not <b>GETFILEBLOCKLOCATIONS</b>.
|
|
||||||
*
|
|
||||||
* @param path path to the file
|
|
||||||
* @param offset start offset in the given file
|
|
||||||
* @param length of the file to get locations for
|
|
||||||
* @param operation
|
|
||||||
* Valid operation is either
|
|
||||||
* {@link org.apache.hadoop.hdfs.web.resources.GetOpParam.Op
|
|
||||||
* #GET_BLOCK_LOCATIONS} or
|
|
||||||
* {@link org.apache.hadoop.hdfs.web.resources.GetOpParam.Op
|
|
||||||
* #GET_BLOCK_LOCATIONS}
|
|
||||||
* @throws IOException
|
|
||||||
* Http connection error, decoding error or given
|
|
||||||
* operation is not valid
|
|
||||||
*/
|
|
||||||
@VisibleForTesting
|
|
||||||
protected BlockLocation[] getFileBlockLocations(
|
|
||||||
GetOpParam.Op operation, final Path path,
|
|
||||||
final long offset, final long length) throws IOException {
|
|
||||||
return new FsPathResponseRunner<BlockLocation[]>(operation, path,
|
|
||||||
new OffsetParam(offset), new LengthParam(length)) {
|
new OffsetParam(offset), new LengthParam(length)) {
|
||||||
@Override
|
@Override
|
||||||
BlockLocation[] decodeResponse(Map<?,?> json) throws IOException {
|
BlockLocation[] decodeResponse(Map<?,?> json) throws IOException {
|
||||||
switch(operation) {
|
return DFSUtilClient.locatedBlocks2Locations(
|
||||||
case GETFILEBLOCKLOCATIONS:
|
JsonUtilClient.toLocatedBlocks(json));
|
||||||
return JsonUtilClient.toBlockLocationArray(json);
|
|
||||||
case GET_BLOCK_LOCATIONS:
|
|
||||||
return DFSUtilClient.locatedBlocks2Locations(
|
|
||||||
JsonUtilClient.toLocatedBlocks(json));
|
|
||||||
default :
|
|
||||||
throw new IOException("Unknown operation " + operation.name());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}.run();
|
}.run();
|
||||||
}
|
}
|
||||||
|
@ -33,18 +33,8 @@ public enum Op implements HttpOpParam.Op {
|
|||||||
GETHOMEDIRECTORY(false, HttpURLConnection.HTTP_OK),
|
GETHOMEDIRECTORY(false, HttpURLConnection.HTTP_OK),
|
||||||
GETDELEGATIONTOKEN(false, HttpURLConnection.HTTP_OK, true),
|
GETDELEGATIONTOKEN(false, HttpURLConnection.HTTP_OK, true),
|
||||||
|
|
||||||
/**
|
/** GET_BLOCK_LOCATIONS is a private unstable op. */
|
||||||
* GET_BLOCK_LOCATIONS is a private/stable API op. It returns a
|
|
||||||
* {@link org.apache.hadoop.hdfs.protocol.LocatedBlocks}
|
|
||||||
* json object.
|
|
||||||
*/
|
|
||||||
GET_BLOCK_LOCATIONS(false, HttpURLConnection.HTTP_OK),
|
GET_BLOCK_LOCATIONS(false, HttpURLConnection.HTTP_OK),
|
||||||
/**
|
|
||||||
* GETFILEBLOCKLOCATIONS is the public op that complies with
|
|
||||||
* {@link org.apache.hadoop.fs.FileSystem#getFileBlockLocations}
|
|
||||||
* interface.
|
|
||||||
*/
|
|
||||||
GETFILEBLOCKLOCATIONS(false, HttpURLConnection.HTTP_OK),
|
|
||||||
GETACLSTATUS(false, HttpURLConnection.HTTP_OK),
|
GETACLSTATUS(false, HttpURLConnection.HTTP_OK),
|
||||||
GETXATTRS(false, HttpURLConnection.HTTP_OK),
|
GETXATTRS(false, HttpURLConnection.HTTP_OK),
|
||||||
GETTRASHROOT(false, HttpURLConnection.HTTP_OK),
|
GETTRASHROOT(false, HttpURLConnection.HTTP_OK),
|
||||||
|
@ -54,7 +54,6 @@
|
|||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.BlockLocation;
|
|
||||||
import org.apache.hadoop.fs.ContentSummary;
|
import org.apache.hadoop.fs.ContentSummary;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
@ -993,21 +992,6 @@ private Response get(
|
|||||||
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
|
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
case GETFILEBLOCKLOCATIONS:
|
|
||||||
{
|
|
||||||
final long offsetValue = offset.getValue();
|
|
||||||
final Long lengthValue = length.getValue();
|
|
||||||
|
|
||||||
FileSystem fs = FileSystem.get(conf != null ?
|
|
||||||
conf : new Configuration());
|
|
||||||
BlockLocation[] locations = fs.getFileBlockLocations(
|
|
||||||
new org.apache.hadoop.fs.Path(fullpath),
|
|
||||||
offsetValue,
|
|
||||||
lengthValue != null? lengthValue: Long.MAX_VALUE);
|
|
||||||
final String js = JsonUtil.toJsonString("BlockLocations",
|
|
||||||
JsonUtil.toJsonMap(locations));
|
|
||||||
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
|
|
||||||
}
|
|
||||||
case GET_BLOCK_LOCATIONS:
|
case GET_BLOCK_LOCATIONS:
|
||||||
{
|
{
|
||||||
final long offsetValue = offset.getValue();
|
final long offsetValue = offset.getValue();
|
||||||
|
@ -470,37 +470,4 @@ private static Object toJsonMap(BlockStoragePolicy blockStoragePolicy) {
|
|||||||
public static String toJsonString(BlockStoragePolicy storagePolicy) {
|
public static String toJsonString(BlockStoragePolicy storagePolicy) {
|
||||||
return toJsonString(BlockStoragePolicy.class, toJsonMap(storagePolicy));
|
return toJsonString(BlockStoragePolicy.class, toJsonMap(storagePolicy));
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Map<String, Object> toJsonMap(BlockLocation[] locations)
|
|
||||||
throws IOException {
|
|
||||||
if(locations == null) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
final Map<String, Object> m = new TreeMap<String, Object>();
|
|
||||||
Object[] blockLocations = new Object[locations.length];
|
|
||||||
for(int i=0; i<locations.length; i++) {
|
|
||||||
blockLocations[i] = toJsonMap(locations[i]);
|
|
||||||
}
|
|
||||||
m.put(BlockLocation.class.getSimpleName(), blockLocations);
|
|
||||||
return m;
|
|
||||||
}
|
|
||||||
|
|
||||||
public static Map<String, Object> toJsonMap(
|
|
||||||
final BlockLocation blockLocation) throws IOException {
|
|
||||||
if (blockLocation == null) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
final Map<String, Object> m = new TreeMap<String, Object>();
|
|
||||||
m.put("length", blockLocation.getLength());
|
|
||||||
m.put("offset", blockLocation.getOffset());
|
|
||||||
m.put("corrupt", blockLocation.isCorrupt());
|
|
||||||
m.put("storageTypes", toJsonArray(blockLocation.getStorageTypes()));
|
|
||||||
m.put("storageIds", blockLocation.getStorageIds());
|
|
||||||
m.put("cachedHosts", blockLocation.getCachedHosts());
|
|
||||||
m.put("hosts", blockLocation.getHosts());
|
|
||||||
m.put("names", blockLocation.getNames());
|
|
||||||
m.put("topologyPaths", blockLocation.getTopologyPaths());
|
|
||||||
return m;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -50,7 +50,6 @@ The HTTP REST API supports the complete [FileSystem](../../api/org/apache/hadoop
|
|||||||
* [`CHECKACCESS`](#Check_access) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).access)
|
* [`CHECKACCESS`](#Check_access) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).access)
|
||||||
* [`GETALLSTORAGEPOLICY`](#Get_all_Storage_Policies) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).getAllStoragePolicies)
|
* [`GETALLSTORAGEPOLICY`](#Get_all_Storage_Policies) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).getAllStoragePolicies)
|
||||||
* [`GETSTORAGEPOLICY`](#Get_Storage_Policy) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).getStoragePolicy)
|
* [`GETSTORAGEPOLICY`](#Get_Storage_Policy) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).getStoragePolicy)
|
||||||
* [`GETFILEBLOCKLOCATIONS`](#Get_File_Block_Locations) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).getFileBlockLocations)
|
|
||||||
* HTTP PUT
|
* HTTP PUT
|
||||||
* [`CREATE`](#Create_and_Write_to_a_File) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).create)
|
* [`CREATE`](#Create_and_Write_to_a_File) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).create)
|
||||||
* [`MKDIRS`](#Make_a_Directory) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).mkdirs)
|
* [`MKDIRS`](#Make_a_Directory) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).mkdirs)
|
||||||
@ -1069,7 +1068,7 @@ See also: [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).unsetStor
|
|||||||
{
|
{
|
||||||
"BlockStoragePolicy": {
|
"BlockStoragePolicy": {
|
||||||
"copyOnCreateFile": false,
|
"copyOnCreateFile": false,
|
||||||
"creationFallbacks": [],
|
"creationFallbacks": [],
|
||||||
"id":7,
|
"id":7,
|
||||||
"name":"HOT",
|
"name":"HOT",
|
||||||
"replicationFallbacks":["ARCHIVE"],
|
"replicationFallbacks":["ARCHIVE"],
|
||||||
@ -1079,51 +1078,6 @@ See also: [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).unsetStor
|
|||||||
|
|
||||||
See also: [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).getStoragePolicy
|
See also: [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).getStoragePolicy
|
||||||
|
|
||||||
### Get File Block Locations
|
|
||||||
|
|
||||||
* Submit a HTTP GET request.
|
|
||||||
|
|
||||||
curl -i "http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=GETFILEBLOCKLOCATIONS
|
|
||||||
|
|
||||||
The client receives a response with a [`BlockLocations` JSON Object](#Block_Locations_JSON_Schema):
|
|
||||||
|
|
||||||
HTTP/1.1 200 OK
|
|
||||||
Content-Type: application/json
|
|
||||||
Transfer-Encoding: chunked
|
|
||||||
|
|
||||||
{
|
|
||||||
"BlockLocations" :
|
|
||||||
{
|
|
||||||
"BlockLocation":
|
|
||||||
[
|
|
||||||
{
|
|
||||||
"cachedHosts" : [],
|
|
||||||
"corrupt" : false,
|
|
||||||
"hosts" : ["host"],
|
|
||||||
"length" : 134217728, // length of this block
|
|
||||||
"names" : ["host:ip"],
|
|
||||||
"offset" : 0, // offset of the block in the file
|
|
||||||
"storageIds" : ["storageid"],
|
|
||||||
"storageTypes" : ["DISK"], // enum {RAM_DISK, SSD, DISK, ARCHIVE}
|
|
||||||
"topologyPaths" : ["/default-rack/hostname:ip"]
|
|
||||||
}, {
|
|
||||||
"cachedHosts" : [],
|
|
||||||
"corrupt" : false,
|
|
||||||
"hosts" : ["host"],
|
|
||||||
"length" : 62599364,
|
|
||||||
"names" : ["host:ip"],
|
|
||||||
"offset" : 134217728,
|
|
||||||
"storageIds" : ["storageid"],
|
|
||||||
"storageTypes" : ["DISK"],
|
|
||||||
"topologyPaths" : ["/default-rack/hostname:ip"]
|
|
||||||
},
|
|
||||||
...
|
|
||||||
]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
See also: [`offset`](#Offset), [`length`](#Length), [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).getFileBlockLocations
|
|
||||||
|
|
||||||
Extended Attributes(XAttrs) Operations
|
Extended Attributes(XAttrs) Operations
|
||||||
--------------------------------------
|
--------------------------------------
|
||||||
|
|
||||||
@ -2082,146 +2036,6 @@ A `BlockStoragePolicies` JSON object represents an array of `BlockStoragePolicy`
|
|||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
#### BlockLocations JSON Schema
|
|
||||||
|
|
||||||
A `BlockLocations` JSON object represents an array of `BlockLocation` JSON objects.
|
|
||||||
|
|
||||||
```json
|
|
||||||
{
|
|
||||||
"name" : "BlockLocations",
|
|
||||||
"properties":
|
|
||||||
{
|
|
||||||
"BlockLocations":
|
|
||||||
{
|
|
||||||
"type" : "object",
|
|
||||||
"properties":
|
|
||||||
{
|
|
||||||
"BlockLocation":
|
|
||||||
{
|
|
||||||
"description": "An array of BlockLocation",
|
|
||||||
"type" : "array",
|
|
||||||
"items" : blockLocationProperties //See BlockLocation Properties
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
See also [`BlockLocation` Properties](#BlockLocation_Properties), [`GETFILEBLOCKLOCATIONS`](#Get_File_Block_Locations), [BlockLocation](../../api/org/apache/hadoop/fs/BlockLocation.html)
|
|
||||||
|
|
||||||
### BlockLocation JSON Schema
|
|
||||||
|
|
||||||
```json
|
|
||||||
{
|
|
||||||
"name" : "BlockLocation",
|
|
||||||
"properties":
|
|
||||||
{
|
|
||||||
"BlockLocation": blockLocationProperties //See BlockLocation Properties
|
|
||||||
}
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
See also [`BlockLocation` Properties](#BlockLocation_Properties), [`GETFILEBLOCKLOCATIONS`](#Get_File_Block_Locations), [BlockLocation](../../api/org/apache/hadoop/fs/BlockLocation.html)
|
|
||||||
|
|
||||||
#### BlockLocation Properties
|
|
||||||
|
|
||||||
JavaScript syntax is used to define `blockLocationProperties` so that it can be referred in both `BlockLocation` and `BlockLocations` JSON schemas.
|
|
||||||
|
|
||||||
```javascript
|
|
||||||
var blockLocationProperties =
|
|
||||||
{
|
|
||||||
"type" : "object",
|
|
||||||
"properties":
|
|
||||||
{
|
|
||||||
"cachedHosts":
|
|
||||||
{
|
|
||||||
"description": "Datanode hostnames with a cached replica",
|
|
||||||
"type" : "array",
|
|
||||||
"required" : "true",
|
|
||||||
"items" :
|
|
||||||
{
|
|
||||||
"description": "A datanode hostname",
|
|
||||||
"type" : "string"
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"corrupt":
|
|
||||||
{
|
|
||||||
"description": "True if the block is corrupted",
|
|
||||||
"type" : "boolean",
|
|
||||||
"required" : "true"
|
|
||||||
},
|
|
||||||
"hosts":
|
|
||||||
{
|
|
||||||
"description": "Datanode hostnames store the block",
|
|
||||||
"type" : "array",
|
|
||||||
"required" : "true",
|
|
||||||
"items" :
|
|
||||||
{
|
|
||||||
"description": "A datanode hostname",
|
|
||||||
"type" : "string"
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"length":
|
|
||||||
{
|
|
||||||
"description": "Length of the block",
|
|
||||||
"type" : "integer",
|
|
||||||
"required" : "true"
|
|
||||||
},
|
|
||||||
"names":
|
|
||||||
{
|
|
||||||
"description": "Datanode IP:xferPort for accessing the block",
|
|
||||||
"type" : "array",
|
|
||||||
"required" : "true",
|
|
||||||
"items" :
|
|
||||||
{
|
|
||||||
"description": "DatanodeIP:xferPort",
|
|
||||||
"type" : "string"
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"offset":
|
|
||||||
{
|
|
||||||
"description": "Offset of the block in the file",
|
|
||||||
"type" : "integer",
|
|
||||||
"required" : "true"
|
|
||||||
},
|
|
||||||
"storageIds":
|
|
||||||
{
|
|
||||||
"description": "Storage ID of each replica",
|
|
||||||
"type" : "array",
|
|
||||||
"required" : "true",
|
|
||||||
"items" :
|
|
||||||
{
|
|
||||||
"description": "Storage ID",
|
|
||||||
"type" : "string"
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"storageTypes":
|
|
||||||
{
|
|
||||||
"description": "Storage type of each replica",
|
|
||||||
"type" : "array",
|
|
||||||
"required" : "true",
|
|
||||||
"items" :
|
|
||||||
{
|
|
||||||
"description": "Storage type",
|
|
||||||
"enum" : ["RAM_DISK", "SSD", "DISK", "ARCHIVE"]
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"topologyPaths":
|
|
||||||
{
|
|
||||||
"description": "Datanode addresses in network topology",
|
|
||||||
"type" : "array",
|
|
||||||
"required" : "true",
|
|
||||||
"items" :
|
|
||||||
{
|
|
||||||
"description": "/rack/host:ip",
|
|
||||||
"type" : "string"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
```
|
|
||||||
|
|
||||||
HTTP Query Parameter Dictionary
|
HTTP Query Parameter Dictionary
|
||||||
-------------------------------
|
-------------------------------
|
||||||
|
|
||||||
|
@ -29,7 +29,6 @@
|
|||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
import java.io.PrintWriter;
|
|
||||||
import java.net.HttpURLConnection;
|
import java.net.HttpURLConnection;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.net.SocketException;
|
import java.net.SocketException;
|
||||||
@ -39,16 +38,8 @@
|
|||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
import java.security.PrivilegedExceptionAction;
|
import java.security.PrivilegedExceptionAction;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
|
|
||||||
import javax.servlet.ServletException;
|
|
||||||
import javax.servlet.http.HttpServlet;
|
|
||||||
import javax.servlet.http.HttpServletRequest;
|
|
||||||
import javax.servlet.http.HttpServletResponse;
|
|
||||||
import javax.ws.rs.core.MediaType;
|
|
||||||
|
|
||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
@ -80,12 +71,8 @@
|
|||||||
import org.apache.hadoop.hdfs.TestFileCreation;
|
import org.apache.hadoop.hdfs.TestFileCreation;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
|
import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
|
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
|
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
|
||||||
@ -96,8 +83,6 @@
|
|||||||
import org.apache.hadoop.hdfs.web.resources.NoRedirectParam;
|
import org.apache.hadoop.hdfs.web.resources.NoRedirectParam;
|
||||||
import org.apache.hadoop.hdfs.web.resources.OffsetParam;
|
import org.apache.hadoop.hdfs.web.resources.OffsetParam;
|
||||||
import org.apache.hadoop.hdfs.web.resources.Param;
|
import org.apache.hadoop.hdfs.web.resources.Param;
|
||||||
import org.apache.hadoop.http.HttpServer2;
|
|
||||||
import org.apache.hadoop.http.HttpServerFunctionalTest;
|
|
||||||
import org.apache.hadoop.io.retry.RetryPolicy;
|
import org.apache.hadoop.io.retry.RetryPolicy;
|
||||||
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
|
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
|
||||||
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction.RetryDecision;
|
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction.RetryDecision;
|
||||||
@ -114,12 +99,8 @@
|
|||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.mockito.internal.util.reflection.Whitebox;
|
import org.mockito.internal.util.reflection.Whitebox;
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
||||||
import com.fasterxml.jackson.databind.type.MapType;
|
|
||||||
|
|
||||||
import static org.mockito.Matchers.any;
|
import static org.mockito.Matchers.any;
|
||||||
import static org.mockito.Matchers.anyInt;
|
import static org.mockito.Matchers.anyInt;
|
||||||
import static org.mockito.Matchers.anyLong;
|
|
||||||
import static org.mockito.Mockito.doReturn;
|
import static org.mockito.Mockito.doReturn;
|
||||||
import static org.mockito.Mockito.doThrow;
|
import static org.mockito.Mockito.doThrow;
|
||||||
import static org.mockito.Mockito.spy;
|
import static org.mockito.Mockito.spy;
|
||||||
@ -974,76 +955,6 @@ public void testWebHdfsGetBlockLocationsWithStorageType() throws Exception{
|
|||||||
Assert.assertTrue(storageTypes != null && storageTypes.length > 0 &&
|
Assert.assertTrue(storageTypes != null && storageTypes.length > 0 &&
|
||||||
storageTypes[0] == StorageType.DISK);
|
storageTypes[0] == StorageType.DISK);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Query webhdfs REST API to get block locations
|
|
||||||
InetSocketAddress addr = cluster.getNameNode().getHttpAddress();
|
|
||||||
|
|
||||||
// Case 1
|
|
||||||
// URL without length or offset parameters
|
|
||||||
URL url1 = new URL("http", addr.getHostString(), addr.getPort(),
|
|
||||||
WebHdfsFileSystem.PATH_PREFIX + "/foo?op=GETFILEBLOCKLOCATIONS");
|
|
||||||
LOG.info("Sending GETFILEBLOCKLOCATIONS request " + url1);
|
|
||||||
|
|
||||||
String response1 = getResponse(url1, "GET");
|
|
||||||
LOG.info("The output of GETFILEBLOCKLOCATIONS request " + response1);
|
|
||||||
// Parse BlockLocation array from json output using object mapper
|
|
||||||
BlockLocation[] locationArray1 = toBlockLocationArray(response1);
|
|
||||||
|
|
||||||
// Verify the result from rest call is same as file system api
|
|
||||||
verifyEquals(locations, locationArray1);
|
|
||||||
|
|
||||||
// Case 2
|
|
||||||
// URL contains length and offset parameters
|
|
||||||
URL url2 = new URL("http", addr.getHostString(), addr.getPort(),
|
|
||||||
WebHdfsFileSystem.PATH_PREFIX + "/foo?op=GETFILEBLOCKLOCATIONS"
|
|
||||||
+ "&length=" + LENGTH + "&offset=" + OFFSET);
|
|
||||||
LOG.info("Sending GETFILEBLOCKLOCATIONS request " + url2);
|
|
||||||
|
|
||||||
String response2 = getResponse(url2, "GET");
|
|
||||||
LOG.info("The output of GETFILEBLOCKLOCATIONS request " + response2);
|
|
||||||
BlockLocation[] locationArray2 = toBlockLocationArray(response2);
|
|
||||||
|
|
||||||
verifyEquals(locations, locationArray2);
|
|
||||||
|
|
||||||
// Case 3
|
|
||||||
// URL contains length parameter but without offset parameters
|
|
||||||
URL url3 = new URL("http", addr.getHostString(), addr.getPort(),
|
|
||||||
WebHdfsFileSystem.PATH_PREFIX + "/foo?op=GETFILEBLOCKLOCATIONS"
|
|
||||||
+ "&length=" + LENGTH);
|
|
||||||
LOG.info("Sending GETFILEBLOCKLOCATIONS request " + url3);
|
|
||||||
|
|
||||||
String response3 = getResponse(url3, "GET");
|
|
||||||
LOG.info("The output of GETFILEBLOCKLOCATIONS request " + response3);
|
|
||||||
BlockLocation[] locationArray3 = toBlockLocationArray(response3);
|
|
||||||
|
|
||||||
verifyEquals(locations, locationArray3);
|
|
||||||
|
|
||||||
// Case 4
|
|
||||||
// URL contains offset parameter but without length parameter
|
|
||||||
URL url4 = new URL("http", addr.getHostString(), addr.getPort(),
|
|
||||||
WebHdfsFileSystem.PATH_PREFIX + "/foo?op=GETFILEBLOCKLOCATIONS"
|
|
||||||
+ "&offset=" + OFFSET);
|
|
||||||
LOG.info("Sending GETFILEBLOCKLOCATIONS request " + url4);
|
|
||||||
|
|
||||||
String response4 = getResponse(url4, "GET");
|
|
||||||
LOG.info("The output of GETFILEBLOCKLOCATIONS request " + response4);
|
|
||||||
BlockLocation[] locationArray4 = toBlockLocationArray(response4);
|
|
||||||
|
|
||||||
verifyEquals(locations, locationArray4);
|
|
||||||
|
|
||||||
// Case 5
|
|
||||||
// URL specifies offset exceeds the file length
|
|
||||||
URL url5 = new URL("http", addr.getHostString(), addr.getPort(),
|
|
||||||
WebHdfsFileSystem.PATH_PREFIX + "/foo?op=GETFILEBLOCKLOCATIONS"
|
|
||||||
+ "&offset=1200");
|
|
||||||
LOG.info("Sending GETFILEBLOCKLOCATIONS request " + url5);
|
|
||||||
|
|
||||||
String response5 = getResponse(url5, "GET");
|
|
||||||
LOG.info("The output of GETFILEBLOCKLOCATIONS request " + response5);
|
|
||||||
BlockLocation[] locationArray5 = toBlockLocationArray(response5);
|
|
||||||
|
|
||||||
// Expected an empty array of BlockLocation
|
|
||||||
verifyEquals(new BlockLocation[] {}, locationArray5);
|
|
||||||
} finally {
|
} finally {
|
||||||
if (cluster != null) {
|
if (cluster != null) {
|
||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
@ -1051,66 +962,6 @@ public void testWebHdfsGetBlockLocationsWithStorageType() throws Exception{
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private BlockLocation[] toBlockLocationArray(String json)
|
|
||||||
throws IOException {
|
|
||||||
ObjectMapper mapper = new ObjectMapper();
|
|
||||||
MapType subType = mapper.getTypeFactory().constructMapType(
|
|
||||||
Map.class,
|
|
||||||
String.class,
|
|
||||||
BlockLocation[].class);
|
|
||||||
MapType rootType = mapper.getTypeFactory().constructMapType(
|
|
||||||
Map.class,
|
|
||||||
mapper.constructType(String.class),
|
|
||||||
mapper.constructType(subType));
|
|
||||||
|
|
||||||
Map<String, Map<String, BlockLocation[]>> jsonMap = mapper
|
|
||||||
.readValue(json, rootType);
|
|
||||||
Map<String, BlockLocation[]> locationMap = jsonMap
|
|
||||||
.get("BlockLocations");
|
|
||||||
BlockLocation[] locationArray = locationMap.get(
|
|
||||||
BlockLocation.class.getSimpleName());
|
|
||||||
return locationArray;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void verifyEquals(BlockLocation[] locations1,
|
|
||||||
BlockLocation[] locations2) throws IOException {
|
|
||||||
for(int i=0; i<locations1.length; i++) {
|
|
||||||
BlockLocation location1 = locations1[i];
|
|
||||||
BlockLocation location2 = locations2[i];
|
|
||||||
Assert.assertEquals(location1.getLength(),
|
|
||||||
location2.getLength());
|
|
||||||
Assert.assertEquals(location1.getOffset(),
|
|
||||||
location2.getOffset());
|
|
||||||
Assert.assertArrayEquals(location1.getCachedHosts(),
|
|
||||||
location2.getCachedHosts());
|
|
||||||
Assert.assertArrayEquals(location1.getHosts(),
|
|
||||||
location2.getHosts());
|
|
||||||
Assert.assertArrayEquals(location1.getNames(),
|
|
||||||
location2.getNames());
|
|
||||||
Assert.assertArrayEquals(location1.getStorageIds(),
|
|
||||||
location2.getStorageIds());
|
|
||||||
Assert.assertArrayEquals(location1.getTopologyPaths(),
|
|
||||||
location2.getTopologyPaths());
|
|
||||||
Assert.assertArrayEquals(location1.getStorageTypes(),
|
|
||||||
location2.getStorageTypes());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static String getResponse(URL url, String httpRequestType)
|
|
||||||
throws IOException {
|
|
||||||
HttpURLConnection conn = null;
|
|
||||||
try {
|
|
||||||
conn = (HttpURLConnection) url.openConnection();
|
|
||||||
conn.setRequestMethod(httpRequestType);
|
|
||||||
conn.setInstanceFollowRedirects(false);
|
|
||||||
return IOUtils.toString(conn.getInputStream());
|
|
||||||
} finally {
|
|
||||||
if(conn != null) {
|
|
||||||
conn.disconnect();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private WebHdfsFileSystem createWebHDFSAsTestUser(final Configuration conf,
|
private WebHdfsFileSystem createWebHDFSAsTestUser(final Configuration conf,
|
||||||
final URI uri, final String userName) throws Exception {
|
final URI uri, final String userName) throws Exception {
|
||||||
|
|
||||||
@ -1467,131 +1318,4 @@ public void testWebHdfsAppend() throws Exception {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* A mock class to handle the {@link WebHdfsFileSystem} client
|
|
||||||
* request. The format of the response depends on how many of
|
|
||||||
* times it gets called (1 to 3 times).
|
|
||||||
* <p>
|
|
||||||
* First time call it return a wrapped json response with a
|
|
||||||
* IllegalArgumentException
|
|
||||||
* <p>
|
|
||||||
* Second time call it return a valid GET_BLOCK_LOCATIONS
|
|
||||||
* json response
|
|
||||||
* <p>
|
|
||||||
* Third time call it return a wrapped json response with
|
|
||||||
* a random IOException
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
public static class MockWebHdfsServlet extends HttpServlet {
|
|
||||||
|
|
||||||
private static final long serialVersionUID = 1L;
|
|
||||||
private static int respondTimes = 0;
|
|
||||||
private static final String RANDOM_EXCEPTION_MSG =
|
|
||||||
"This is a random exception";
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void doGet(HttpServletRequest request,
|
|
||||||
HttpServletResponse response) throws ServletException, IOException {
|
|
||||||
response.setHeader("Content-Type",
|
|
||||||
MediaType.APPLICATION_JSON);
|
|
||||||
String param = request.getParameter("op");
|
|
||||||
if(respondTimes == 0) {
|
|
||||||
Exception mockException = new IllegalArgumentException(
|
|
||||||
"Invalid value for webhdfs parameter \"op\". "
|
|
||||||
+ "" + "No enum constant " + param);
|
|
||||||
sendException(request, response, mockException);
|
|
||||||
} else if (respondTimes == 1) {
|
|
||||||
sendResponse(request, response);
|
|
||||||
} else if (respondTimes == 2) {
|
|
||||||
Exception mockException = new IOException(RANDOM_EXCEPTION_MSG);
|
|
||||||
sendException(request, response, mockException);
|
|
||||||
}
|
|
||||||
respondTimes++;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void sendResponse(HttpServletRequest request,
|
|
||||||
HttpServletResponse response) throws IOException {
|
|
||||||
response.setStatus(HttpServletResponse.SC_OK);
|
|
||||||
// Construct a LocatedBlock for testing
|
|
||||||
DatanodeInfo d = DFSTestUtil.getLocalDatanodeInfo();
|
|
||||||
DatanodeInfo[] ds = new DatanodeInfo[1];
|
|
||||||
ds[0] = d;
|
|
||||||
ExtendedBlock b1 = new ExtendedBlock("bpid", 1, 121, 1);
|
|
||||||
LocatedBlock l1 = new LocatedBlock(b1, ds);
|
|
||||||
l1.setStartOffset(0);
|
|
||||||
l1.setCorrupt(false);
|
|
||||||
List<LocatedBlock> ls = Arrays.asList(l1);
|
|
||||||
LocatedBlocks locatedblocks =
|
|
||||||
new LocatedBlocks(10, false, ls, l1,
|
|
||||||
true, null, null);
|
|
||||||
|
|
||||||
try (PrintWriter pw = response.getWriter()) {
|
|
||||||
pw.write(JsonUtil.toJsonString(locatedblocks));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void sendException(HttpServletRequest request,
|
|
||||||
HttpServletResponse response,
|
|
||||||
Exception mockException) throws IOException {
|
|
||||||
response.setStatus(HttpServletResponse.SC_BAD_REQUEST);
|
|
||||||
String errJs = JsonUtil.toJsonString(mockException);
|
|
||||||
try (PrintWriter pw = response.getWriter()) {
|
|
||||||
pw.write(errJs);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testGetFileBlockLocationsBackwardsCompatibility()
|
|
||||||
throws Exception {
|
|
||||||
final Configuration conf = WebHdfsTestUtil.createConf();
|
|
||||||
final String pathSpec = WebHdfsFileSystem.PATH_PREFIX + "/*";
|
|
||||||
HttpServer2 http = null;
|
|
||||||
try {
|
|
||||||
http = HttpServerFunctionalTest.createTestServer(conf);
|
|
||||||
http.addServlet("test", pathSpec, MockWebHdfsServlet.class);
|
|
||||||
http.start();
|
|
||||||
|
|
||||||
// Write the address back to configuration so
|
|
||||||
// WebHdfsFileSystem could connect to the mock server
|
|
||||||
conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY,
|
|
||||||
"localhost:" + http.getConnectorAddress(0).getPort());
|
|
||||||
|
|
||||||
final WebHdfsFileSystem webFS = WebHdfsTestUtil.getWebHdfsFileSystem(
|
|
||||||
conf, WebHdfsConstants.WEBHDFS_SCHEME);
|
|
||||||
|
|
||||||
WebHdfsFileSystem spyFs = spy(webFS);
|
|
||||||
BlockLocation[] locations = spyFs
|
|
||||||
.getFileBlockLocations(new Path("p"), 0, 100);
|
|
||||||
|
|
||||||
// Verify result
|
|
||||||
assertEquals(1, locations.length);
|
|
||||||
assertEquals(121, locations[0].getLength());
|
|
||||||
|
|
||||||
// Verify the fall back
|
|
||||||
// The function should be called exactly 2 times
|
|
||||||
// 1st time handles GETFILEBLOCKLOCATIONS and found it is not supported
|
|
||||||
// 2nd time fall back to handle GET_FILE_BLOCK_LOCATIONS
|
|
||||||
verify(spyFs, times(2)).getFileBlockLocations(any(),
|
|
||||||
any(), anyLong(), anyLong());
|
|
||||||
|
|
||||||
// Verify it doesn't erroneously fall back
|
|
||||||
// When server returns a different error, it should directly
|
|
||||||
// throw an exception.
|
|
||||||
try {
|
|
||||||
spyFs.getFileBlockLocations(new Path("p"), 0, 100);
|
|
||||||
} catch (Exception e) {
|
|
||||||
assertTrue(e instanceof IOException);
|
|
||||||
assertEquals(e.getMessage(), MockWebHdfsServlet.RANDOM_EXCEPTION_MSG);
|
|
||||||
// Totally this function has been called 3 times
|
|
||||||
verify(spyFs, times(3)).getFileBlockLocations(any(),
|
|
||||||
any(), anyLong(), anyLong());
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
if(http != null) {
|
|
||||||
http.stop();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user