HDFS-12459. Fix revert: Add new op GETFILEBLOCKLOCATIONS to WebHDFS REST API. Contributed by Weiwei Yang.

Signed-off-by: Wei-Chiu Chuang <weichiu@apache.org>
This commit is contained in:
Weiwei Yang 2018-10-10 10:07:22 -07:00 committed by Wei-Chiu Chuang
parent cdc4350718
commit 3ead525c71
5 changed files with 378 additions and 1 deletions

View File

@ -33,8 +33,18 @@ public enum Op implements HttpOpParam.Op {
GETHOMEDIRECTORY(false, HttpURLConnection.HTTP_OK),
GETDELEGATIONTOKEN(false, HttpURLConnection.HTTP_OK, true),
/** GET_BLOCK_LOCATIONS is a private unstable op. */
/**
* GET_BLOCK_LOCATIONS is a private/stable API op. It returns a
* {@link org.apache.hadoop.hdfs.protocol.LocatedBlocks}
* json object.
*/
GET_BLOCK_LOCATIONS(false, HttpURLConnection.HTTP_OK),
/**
* GETFILEBLOCKLOCATIONS is the public op that complies with
* {@link org.apache.hadoop.fs.FileSystem#getFileBlockLocations}
* interface.
*/
GETFILEBLOCKLOCATIONS(false, HttpURLConnection.HTTP_OK),
GETACLSTATUS(false, HttpURLConnection.HTTP_OK),
GETXATTRS(false, HttpURLConnection.HTTP_OK),
GETTRASHROOT(false, HttpURLConnection.HTTP_OK),

View File

@ -59,6 +59,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileEncryptionInfo;
import org.apache.hadoop.fs.FileStatus;
@ -72,6 +73,7 @@
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.XAttrHelper;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
@ -1075,6 +1077,18 @@ protected Response get(
.build();
}
}
case GETFILEBLOCKLOCATIONS:
{
final long offsetValue = offset.getValue();
final Long lengthValue = length.getValue();
LocatedBlocks locatedBlocks = getRpcClientProtocol()
.getBlockLocations(fullpath, offsetValue, lengthValue != null ?
lengthValue : Long.MAX_VALUE);
BlockLocation[] locations =
DFSUtilClient.locatedBlocks2Locations(locatedBlocks);
final String js = JsonUtil.toJsonString(locations);
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
}
case GET_BLOCK_LOCATIONS:
{
final long offsetValue = offset.getValue();

View File

@ -572,4 +572,36 @@ private static Object toJsonMap(
m.put("dirStatus", toJsonMap(snapshottableDirectoryStatus.getDirStatus()));
return m;
}
private static Map<String, Object> toJsonMap(
final BlockLocation blockLocation) throws IOException {
if (blockLocation == null) {
return null;
}
final Map<String, Object> m = new HashMap<>();
m.put("length", blockLocation.getLength());
m.put("offset", blockLocation.getOffset());
m.put("corrupt", blockLocation.isCorrupt());
m.put("storageTypes", toJsonArray(blockLocation.getStorageTypes()));
m.put("cachedHosts", blockLocation.getCachedHosts());
m.put("hosts", blockLocation.getHosts());
m.put("names", blockLocation.getNames());
m.put("topologyPaths", blockLocation.getTopologyPaths());
return m;
}
public static String toJsonString(BlockLocation[] locations)
throws IOException {
if (locations == null) {
return null;
}
final Map<String, Object> m = new HashMap<>();
Object[] blockLocations = new Object[locations.length];
for(int i=0; i<locations.length; i++) {
blockLocations[i] = toJsonMap(locations[i]);
}
m.put(BlockLocation.class.getSimpleName(), blockLocations);
return toJsonString("BlockLocations", m);
}
}

View File

@ -52,6 +52,7 @@ The HTTP REST API supports the complete [FileSystem](../../api/org/apache/hadoop
* [`GETSTORAGEPOLICY`](#Get_Storage_Policy) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).getStoragePolicy)
* [`GETSNAPSHOTDIFF`](#Get_Snapshot_Diff)
* [`GETSNAPSHOTTABLEDIRECTORYLIST`](#Get_Snapshottable_Directory_List)
* [`GETFILEBLOCKLOCATIONS`](#Get_File_Block_Locations) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).getFileBlockLocations)
* HTTP PUT
* [`CREATE`](#Create_and_Write_to_a_File) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).create)
* [`MKDIRS`](#Make_a_Directory) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).mkdirs)
@ -1087,6 +1088,49 @@ See also: [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).unsetStor
See also: [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).getStoragePolicy
### Get File Block Locations
* Submit a HTTP GET request.
curl -i "http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=GETFILEBLOCKLOCATIONS
The client receives a response with a [`BlockLocations` JSON Object](#Block_Locations_JSON_Schema):
HTTP/1.1 200 OK
Content-Type: application/json
Transfer-Encoding: chunked
{
"BlockLocations" :
{
"BlockLocation":
[
{
"cachedHosts" : [],
"corrupt" : false,
"hosts" : ["host"],
"length" : 134217728, // length of this block
"names" : ["host:ip"],
"offset" : 0, // offset of the block in the file
"storageTypes" : ["DISK"], // enum {RAM_DISK, SSD, DISK, ARCHIVE}
"topologyPaths" : ["/default-rack/hostname:ip"]
}, {
"cachedHosts" : [],
"corrupt" : false,
"hosts" : ["host"],
"length" : 62599364,
"names" : ["host:ip"],
"offset" : 134217728,
"storageTypes" : ["DISK"],
"topologyPaths" : ["/default-rack/hostname:ip"]
},
...
]
}
}
See also: [`offset`](#Offset), [`length`](#Length), [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).getFileBlockLocations
Extended Attributes(XAttrs) Operations
--------------------------------------
@ -2227,6 +2271,135 @@ var snapshottableDirectoryStatus =
}
```
### BlockLocations JSON Schema
A `BlockLocations` JSON object represents an array of `BlockLocation` JSON objects.
```json
{
"name" : "BlockLocations",
"properties":
{
"BlockLocations":
{
"type" : "object",
"properties":
{
"BlockLocation":
{
"description": "An array of BlockLocation",
"type" : "array",
"items" : blockLocationProperties //See BlockLocation Properties
}
}
}
}
}
```
See also [`BlockLocation` Properties](#BlockLocation_Properties), [`GETFILEBLOCKLOCATIONS`](#Get_File_Block_Locations), [BlockLocation](../../api/org/apache/hadoop/fs/BlockLocation.html)
### BlockLocation JSON Schema
```json
{
"name" : "BlockLocation",
"properties":
{
"BlockLocation": blockLocationProperties //See BlockLocation Properties
}
}
```
See also [`BlockLocation` Properties](#BlockLocation_Properties), [`GETFILEBLOCKLOCATIONS`](#Get_File_Block_Locations), [BlockLocation](../../api/org/apache/hadoop/fs/BlockLocation.html)
#### BlockLocation Properties
JavaScript syntax is used to define `blockLocationProperties` so that it can be referred in both `BlockLocation` and `BlockLocations` JSON schemas.
```javascript
var blockLocationProperties =
{
"type" : "object",
"properties":
{
"cachedHosts":
{
"description": "Datanode hostnames with a cached replica",
"type" : "array",
"required" : "true",
"items" :
{
"description": "A datanode hostname",
"type" : "string"
}
},
"corrupt":
{
"description": "True if the block is corrupted",
"type" : "boolean",
"required" : "true"
},
"hosts":
{
"description": "Datanode hostnames store the block",
"type" : "array",
"required" : "true",
"items" :
{
"description": "A datanode hostname",
"type" : "string"
}
},
"length":
{
"description": "Length of the block",
"type" : "integer",
"required" : "true"
},
"names":
{
"description": "Datanode IP:xferPort for accessing the block",
"type" : "array",
"required" : "true",
"items" :
{
"description": "DatanodeIP:xferPort",
"type" : "string"
}
},
"offset":
{
"description": "Offset of the block in the file",
"type" : "integer",
"required" : "true"
},
"storageTypes":
{
"description": "Storage type of each replica",
"type" : "array",
"required" : "true",
"items" :
{
"description": "Storage type",
"enum" : ["RAM_DISK", "SSD", "DISK", "ARCHIVE"]
}
},
"topologyPaths":
{
"description": "Datanode addresses in network topology",
"type" : "array",
"required" : "true",
"items" :
{
"description": "/rack/host:ip",
"type" : "string"
}
}
}
};
```
HTTP Query Parameter Dictionary
-------------------------------

View File

@ -47,6 +47,7 @@
import java.nio.charset.StandardCharsets;
import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
import java.util.Map;
import java.util.Random;
import com.google.common.collect.ImmutableList;
@ -118,6 +119,9 @@
import org.junit.Test;
import org.mockito.Mockito;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.type.MapType;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Mockito.doReturn;
@ -1136,6 +1140,150 @@ public void testWebHdfsGetBlockLocationsWithStorageType() throws Exception{
}
}
@Test
public void testWebHdfsGetBlockLocations() throws Exception{
MiniDFSCluster cluster = null;
final Configuration conf = WebHdfsTestUtil.createConf();
final int offset = 42;
final int length = 512;
final Path path = new Path("/foo");
byte[] contents = new byte[1024];
RANDOM.nextBytes(contents);
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
final WebHdfsFileSystem fs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
WebHdfsConstants.WEBHDFS_SCHEME);
try (OutputStream os = fs.create(path)) {
os.write(contents);
}
BlockLocation[] locations = fs.getFileBlockLocations(path, offset,
length);
// Query webhdfs REST API to get block locations
InetSocketAddress addr = cluster.getNameNode().getHttpAddress();
// Case 1
// URL without length or offset parameters
URL url1 = new URL("http", addr.getHostString(), addr.getPort(),
WebHdfsFileSystem.PATH_PREFIX + "/foo?op=GETFILEBLOCKLOCATIONS");
String response1 = getResponse(url1, "GET");
// Parse BlockLocation array from json output using object mapper
BlockLocation[] locationArray1 = toBlockLocationArray(response1);
// Verify the result from rest call is same as file system api
verifyEquals(locations, locationArray1);
// Case 2
// URL contains length and offset parameters
URL url2 = new URL("http", addr.getHostString(), addr.getPort(),
WebHdfsFileSystem.PATH_PREFIX + "/foo?op=GETFILEBLOCKLOCATIONS"
+ "&length=" + length + "&offset=" + offset);
String response2 = getResponse(url2, "GET");
BlockLocation[] locationArray2 = toBlockLocationArray(response2);
verifyEquals(locations, locationArray2);
// Case 3
// URL contains length parameter but without offset parameters
URL url3 = new URL("http", addr.getHostString(), addr.getPort(),
WebHdfsFileSystem.PATH_PREFIX + "/foo?op=GETFILEBLOCKLOCATIONS"
+ "&length=" + length);
String response3 = getResponse(url3, "GET");
BlockLocation[] locationArray3 = toBlockLocationArray(response3);
verifyEquals(locations, locationArray3);
// Case 4
// URL contains offset parameter but without length parameter
URL url4 = new URL("http", addr.getHostString(), addr.getPort(),
WebHdfsFileSystem.PATH_PREFIX + "/foo?op=GETFILEBLOCKLOCATIONS"
+ "&offset=" + offset);
String response4 = getResponse(url4, "GET");
BlockLocation[] locationArray4 = toBlockLocationArray(response4);
verifyEquals(locations, locationArray4);
// Case 5
// URL specifies offset exceeds the file length
URL url5 = new URL("http", addr.getHostString(), addr.getPort(),
WebHdfsFileSystem.PATH_PREFIX + "/foo?op=GETFILEBLOCKLOCATIONS"
+ "&offset=1200");
String response5 = getResponse(url5, "GET");
BlockLocation[] locationArray5 = toBlockLocationArray(response5);
// Expected an empty array of BlockLocation
verifyEquals(new BlockLocation[] {}, locationArray5);
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
private BlockLocation[] toBlockLocationArray(String json)
throws IOException {
ObjectMapper mapper = new ObjectMapper();
MapType subType = mapper.getTypeFactory().constructMapType(
Map.class,
String.class,
BlockLocation[].class);
MapType rootType = mapper.getTypeFactory().constructMapType(
Map.class,
mapper.constructType(String.class),
mapper.constructType(subType));
Map<String, Map<String, BlockLocation[]>> jsonMap = mapper
.readValue(json, rootType);
Map<String, BlockLocation[]> locationMap = jsonMap
.get("BlockLocations");
BlockLocation[] locationArray = locationMap.get(
BlockLocation.class.getSimpleName());
return locationArray;
}
private void verifyEquals(BlockLocation[] locations1,
BlockLocation[] locations2) throws IOException {
for(int i=0; i<locations1.length; i++) {
BlockLocation location1 = locations1[i];
BlockLocation location2 = locations2[i];
Assert.assertEquals(location1.getLength(),
location2.getLength());
Assert.assertEquals(location1.getOffset(),
location2.getOffset());
Assert.assertArrayEquals(location1.getCachedHosts(),
location2.getCachedHosts());
Assert.assertArrayEquals(location1.getHosts(),
location2.getHosts());
Assert.assertArrayEquals(location1.getNames(),
location2.getNames());
Assert.assertArrayEquals(location1.getTopologyPaths(),
location2.getTopologyPaths());
Assert.assertArrayEquals(location1.getStorageTypes(),
location2.getStorageTypes());
}
}
private static String getResponse(URL url, String httpRequestType)
throws IOException {
HttpURLConnection conn = null;
try {
conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod(httpRequestType);
conn.setInstanceFollowRedirects(false);
return IOUtils.toString(conn.getInputStream(),
StandardCharsets.UTF_8);
} finally {
if(conn != null) {
conn.disconnect();
}
}
}
private WebHdfsFileSystem createWebHDFSAsTestUser(final Configuration conf,
final URI uri, final String userName) throws Exception {