Revert "HDFS-6874. Add GETFILEBLOCKLOCATIONS operation to HttpFS. Contributed by Weiwei Yang"
This reverts commit 931a49800e
.
Conflicts:
hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/HttpFSFileSystem.java
hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/FSOperations.java
hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/fs/http/client/BaseTestHttpFSWith.java
This commit is contained in:
parent
49467165a5
commit
2f867115a8
@ -23,12 +23,9 @@
|
|||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
||||||
import com.fasterxml.jackson.databind.type.MapType;
|
|
||||||
import com.google.common.base.Charsets;
|
import com.google.common.base.Charsets;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.BlockLocation;
|
|
||||||
import org.apache.hadoop.fs.ContentSummary;
|
import org.apache.hadoop.fs.ContentSummary;
|
||||||
import org.apache.hadoop.fs.DelegationTokenRenewer;
|
import org.apache.hadoop.fs.DelegationTokenRenewer;
|
||||||
import org.apache.hadoop.fs.FSDataInputStream;
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
@ -122,8 +119,6 @@ public class HttpFSFileSystem extends FileSystem
|
|||||||
public static final String NEW_LENGTH_PARAM = "newlength";
|
public static final String NEW_LENGTH_PARAM = "newlength";
|
||||||
public static final String START_AFTER_PARAM = "startAfter";
|
public static final String START_AFTER_PARAM = "startAfter";
|
||||||
public static final String POLICY_NAME_PARAM = "storagepolicy";
|
public static final String POLICY_NAME_PARAM = "storagepolicy";
|
||||||
public static final String OFFSET_PARAM = "offset";
|
|
||||||
public static final String LENGTH_PARAM = "length";
|
|
||||||
public static final String SNAPSHOT_NAME_PARAM = "snapshotname";
|
public static final String SNAPSHOT_NAME_PARAM = "snapshotname";
|
||||||
public static final String OLD_SNAPSHOT_NAME_PARAM = "oldsnapshotname";
|
public static final String OLD_SNAPSHOT_NAME_PARAM = "oldsnapshotname";
|
||||||
|
|
||||||
@ -210,7 +205,6 @@ public static FILE_TYPE getType(FileStatus fileStatus) {
|
|||||||
|
|
||||||
public static final String STORAGE_POLICIES_JSON = "BlockStoragePolicies";
|
public static final String STORAGE_POLICIES_JSON = "BlockStoragePolicies";
|
||||||
public static final String STORAGE_POLICY_JSON = "BlockStoragePolicy";
|
public static final String STORAGE_POLICY_JSON = "BlockStoragePolicy";
|
||||||
public static final String BLOCK_LOCATIONS_JSON = "BlockLocations";
|
|
||||||
|
|
||||||
public static final int HTTP_TEMPORARY_REDIRECT = 307;
|
public static final int HTTP_TEMPORARY_REDIRECT = 307;
|
||||||
|
|
||||||
@ -1359,42 +1353,6 @@ public BlockStoragePolicy getStoragePolicy(Path src) throws IOException {
|
|||||||
return createStoragePolicy((JSONObject) json.get(STORAGE_POLICY_JSON));
|
return createStoragePolicy((JSONObject) json.get(STORAGE_POLICY_JSON));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public BlockLocation[] getFileBlockLocations(FileStatus file, long start,
|
|
||||||
long len) throws IOException {
|
|
||||||
Map<String, String> params = new HashMap<String, String>();
|
|
||||||
params.put(OP_PARAM, Operation.GETFILEBLOCKLOCATIONS.toString());
|
|
||||||
params.put(OFFSET_PARAM, Long.toString(start));
|
|
||||||
params.put(LENGTH_PARAM, Long.toString(len));
|
|
||||||
HttpURLConnection conn =
|
|
||||||
getConnection(Operation.GETFILEBLOCKLOCATIONS.getMethod(), params,
|
|
||||||
file.getPath(), true);
|
|
||||||
HttpExceptionUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
|
|
||||||
JSONObject json = (JSONObject) HttpFSUtils.jsonParse(conn);
|
|
||||||
return toBlockLocations(json);
|
|
||||||
}
|
|
||||||
|
|
||||||
private BlockLocation[] toBlockLocations(JSONObject json)
|
|
||||||
throws IOException {
|
|
||||||
ObjectMapper mapper = new ObjectMapper();
|
|
||||||
MapType subType = mapper.getTypeFactory().constructMapType(
|
|
||||||
Map.class,
|
|
||||||
String.class,
|
|
||||||
BlockLocation[].class);
|
|
||||||
MapType rootType = mapper.getTypeFactory().constructMapType(
|
|
||||||
Map.class,
|
|
||||||
mapper.constructType(String.class),
|
|
||||||
mapper.constructType(subType));
|
|
||||||
|
|
||||||
Map<String, Map<String, BlockLocation[]>> jsonMap = mapper
|
|
||||||
.readValue(json.toJSONString(), rootType);
|
|
||||||
Map<String, BlockLocation[]> locationMap = jsonMap
|
|
||||||
.get(BLOCK_LOCATIONS_JSON);
|
|
||||||
BlockLocation[] locationArray = locationMap.get(
|
|
||||||
BlockLocation.class.getSimpleName());
|
|
||||||
return locationArray;
|
|
||||||
}
|
|
||||||
|
|
||||||
private BlockStoragePolicy createStoragePolicy(JSONObject policyJson)
|
private BlockStoragePolicy createStoragePolicy(JSONObject policyJson)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
byte id = ((Number) policyJson.get("id")).byteValue();
|
byte id = ((Number) policyJson.get("id")).byteValue();
|
||||||
|
@ -18,7 +18,6 @@
|
|||||||
package org.apache.hadoop.fs.http.server;
|
package org.apache.hadoop.fs.http.server;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.fs.BlockLocation;
|
|
||||||
import org.apache.hadoop.fs.BlockStoragePolicySpi;
|
import org.apache.hadoop.fs.BlockStoragePolicySpi;
|
||||||
import org.apache.hadoop.fs.ContentSummary;
|
import org.apache.hadoop.fs.ContentSummary;
|
||||||
import org.apache.hadoop.fs.FileChecksum;
|
import org.apache.hadoop.fs.FileChecksum;
|
||||||
@ -36,7 +35,6 @@
|
|||||||
import org.apache.hadoop.fs.permission.AclStatus;
|
import org.apache.hadoop.fs.permission.AclStatus;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||||
import org.apache.hadoop.hdfs.web.JsonUtil;
|
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.lib.service.FileSystemAccess;
|
import org.apache.hadoop.lib.service.FileSystemAccess;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
@ -1458,41 +1456,6 @@ public Void execute(FileSystem fs) throws IOException {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Executor that performs a getFileBlockLocations FileSystemAccess
|
|
||||||
* file system operation.
|
|
||||||
*/
|
|
||||||
@InterfaceAudience.Private
|
|
||||||
@SuppressWarnings("rawtypes")
|
|
||||||
public static class FSFileBlockLocations implements
|
|
||||||
FileSystemAccess.FileSystemExecutor<Map> {
|
|
||||||
private Path path;
|
|
||||||
private long offsetValue;
|
|
||||||
private long lengthValue;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Creates a file-block-locations executor.
|
|
||||||
*
|
|
||||||
* @param path the path to retrieve the location
|
|
||||||
* @param offsetValue offset into the given file
|
|
||||||
* @param lengthValue length for which to get locations for
|
|
||||||
*/
|
|
||||||
public FSFileBlockLocations(String path, long offsetValue,
|
|
||||||
long lengthValue) {
|
|
||||||
this.path = new Path(path);
|
|
||||||
this.offsetValue = offsetValue;
|
|
||||||
this.lengthValue = lengthValue;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Map execute(FileSystem fs) throws IOException {
|
|
||||||
BlockLocation[] locations =
|
|
||||||
fs.getFileBlockLocations(this.path, this.offsetValue,
|
|
||||||
this.lengthValue);
|
|
||||||
return JsonUtil.toJsonMap(locations);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Executor that performs a createSnapshot FileSystemAccess operation.
|
* Executor that performs a createSnapshot FileSystemAccess operation.
|
||||||
*/
|
*/
|
||||||
@ -1596,5 +1559,4 @@ public Void execute(FileSystem fs) throws IOException {
|
|||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -58,8 +58,7 @@ public class HttpFSParametersProvider extends ParametersProvider {
|
|||||||
PARAMS_DEF.put(Operation.GETHOMEDIRECTORY, new Class[]{});
|
PARAMS_DEF.put(Operation.GETHOMEDIRECTORY, new Class[]{});
|
||||||
PARAMS_DEF.put(Operation.GETCONTENTSUMMARY, new Class[]{});
|
PARAMS_DEF.put(Operation.GETCONTENTSUMMARY, new Class[]{});
|
||||||
PARAMS_DEF.put(Operation.GETFILECHECKSUM, new Class[]{});
|
PARAMS_DEF.put(Operation.GETFILECHECKSUM, new Class[]{});
|
||||||
PARAMS_DEF.put(Operation.GETFILEBLOCKLOCATIONS,
|
PARAMS_DEF.put(Operation.GETFILEBLOCKLOCATIONS, new Class[]{});
|
||||||
new Class[] {OffsetParam.class, LenParam.class});
|
|
||||||
PARAMS_DEF.put(Operation.GETACLSTATUS, new Class[]{});
|
PARAMS_DEF.put(Operation.GETACLSTATUS, new Class[]{});
|
||||||
PARAMS_DEF.put(Operation.GETTRASHROOT, new Class[]{});
|
PARAMS_DEF.put(Operation.GETTRASHROOT, new Class[]{});
|
||||||
PARAMS_DEF.put(Operation.INSTRUMENTATION, new Class[]{});
|
PARAMS_DEF.put(Operation.INSTRUMENTATION, new Class[]{});
|
||||||
|
@ -51,7 +51,6 @@
|
|||||||
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.XAttrNameParam;
|
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.XAttrNameParam;
|
||||||
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.XAttrSetFlagParam;
|
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.XAttrSetFlagParam;
|
||||||
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.XAttrValueParam;
|
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.XAttrValueParam;
|
||||||
import org.apache.hadoop.hdfs.web.JsonUtil;
|
|
||||||
import org.apache.hadoop.http.JettyUtils;
|
import org.apache.hadoop.http.JettyUtils;
|
||||||
import org.apache.hadoop.lib.service.FileSystemAccess;
|
import org.apache.hadoop.lib.service.FileSystemAccess;
|
||||||
import org.apache.hadoop.lib.service.FileSystemAccessException;
|
import org.apache.hadoop.lib.service.FileSystemAccessException;
|
||||||
@ -299,25 +298,7 @@ public InputStream run() throws Exception {
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case GETFILEBLOCKLOCATIONS: {
|
case GETFILEBLOCKLOCATIONS: {
|
||||||
long offset = 0;
|
response = Response.status(Response.Status.BAD_REQUEST).build();
|
||||||
// In case length is not given, reset to max long
|
|
||||||
// in order to retrieve all file block locations
|
|
||||||
long len = Long.MAX_VALUE;
|
|
||||||
Long offsetParam = params.get(OffsetParam.NAME, OffsetParam.class);
|
|
||||||
Long lenParam = params.get(LenParam.NAME, LenParam.class);
|
|
||||||
AUDIT_LOG.info("[{}] offset [{}] len [{}]",
|
|
||||||
new Object[] {path, offsetParam, lenParam});
|
|
||||||
if (offsetParam != null && offsetParam.longValue() > 0) {
|
|
||||||
offset = offsetParam.longValue();
|
|
||||||
}
|
|
||||||
if (lenParam != null && lenParam.longValue() > 0) {
|
|
||||||
len = lenParam.longValue();
|
|
||||||
}
|
|
||||||
FSOperations.FSFileBlockLocations command =
|
|
||||||
new FSOperations.FSFileBlockLocations(path, offset, len);
|
|
||||||
@SuppressWarnings("rawtypes") Map locations = fsExecute(user, command);
|
|
||||||
final String json = JsonUtil.toJsonString("BlockLocations", locations);
|
|
||||||
response = Response.ok(json).type(MediaType.APPLICATION_JSON).build();
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case GETACLSTATUS: {
|
case GETACLSTATUS: {
|
||||||
|
@ -20,7 +20,6 @@
|
|||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.BlockStoragePolicySpi;
|
import org.apache.hadoop.fs.BlockStoragePolicySpi;
|
||||||
import org.apache.hadoop.fs.BlockLocation;
|
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||||
import org.apache.hadoop.fs.ContentSummary;
|
import org.apache.hadoop.fs.ContentSummary;
|
||||||
import org.apache.hadoop.fs.FileChecksum;
|
import org.apache.hadoop.fs.FileChecksum;
|
||||||
@ -1041,7 +1040,7 @@ protected enum Operation {
|
|||||||
WORKING_DIRECTORY, MKDIRS, SET_TIMES, SET_PERMISSION, SET_OWNER,
|
WORKING_DIRECTORY, MKDIRS, SET_TIMES, SET_PERMISSION, SET_OWNER,
|
||||||
SET_REPLICATION, CHECKSUM, CONTENT_SUMMARY, FILEACLS, DIRACLS, SET_XATTR,
|
SET_REPLICATION, CHECKSUM, CONTENT_SUMMARY, FILEACLS, DIRACLS, SET_XATTR,
|
||||||
GET_XATTRS, REMOVE_XATTR, LIST_XATTRS, ENCRYPTION, LIST_STATUS_BATCH,
|
GET_XATTRS, REMOVE_XATTR, LIST_XATTRS, ENCRYPTION, LIST_STATUS_BATCH,
|
||||||
GETTRASHROOT, STORAGEPOLICY, ERASURE_CODING, GETFILEBLOCKLOCATIONS,
|
GETTRASHROOT, STORAGEPOLICY, ERASURE_CODING,
|
||||||
CREATE_SNAPSHOT, RENAME_SNAPSHOT, DELETE_SNAPSHOT
|
CREATE_SNAPSHOT, RENAME_SNAPSHOT, DELETE_SNAPSHOT
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1131,9 +1130,6 @@ private void operation(Operation op) throws Exception {
|
|||||||
case ERASURE_CODING:
|
case ERASURE_CODING:
|
||||||
testErasureCoding();
|
testErasureCoding();
|
||||||
break;
|
break;
|
||||||
case GETFILEBLOCKLOCATIONS:
|
|
||||||
testGetFileBlockLocations();
|
|
||||||
break;
|
|
||||||
case CREATE_SNAPSHOT:
|
case CREATE_SNAPSHOT:
|
||||||
testCreateSnapshot();
|
testCreateSnapshot();
|
||||||
break;
|
break;
|
||||||
@ -1189,88 +1185,6 @@ public Void run() throws Exception {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private void testGetFileBlockLocations() throws Exception {
|
|
||||||
BlockLocation[] locations1, locations2, locations11, locations21 = null;
|
|
||||||
Path testFile = null;
|
|
||||||
|
|
||||||
// Test single block file block locations.
|
|
||||||
try (FileSystem fs = FileSystem.get(getProxiedFSConf())) {
|
|
||||||
testFile = new Path(getProxiedFSTestDir(), "singleBlock.txt");
|
|
||||||
DFSTestUtil.createFile(fs, testFile, (long) 1, (short) 1, 0L);
|
|
||||||
locations1 = fs.getFileBlockLocations(testFile, 0, 1);
|
|
||||||
Assert.assertNotNull(locations1);
|
|
||||||
}
|
|
||||||
|
|
||||||
try (FileSystem fs = getHttpFSFileSystem()) {
|
|
||||||
locations2 = fs.getFileBlockLocations(testFile, 0, 1);
|
|
||||||
Assert.assertNotNull(locations2);
|
|
||||||
}
|
|
||||||
|
|
||||||
verifyBlockLocations(locations1, locations2);
|
|
||||||
|
|
||||||
// Test multi-block single replica file block locations.
|
|
||||||
try (FileSystem fs = FileSystem.get(getProxiedFSConf())) {
|
|
||||||
testFile = new Path(getProxiedFSTestDir(), "multipleBlocks.txt");
|
|
||||||
DFSTestUtil.createFile(fs, testFile, 512, (short) 2048,
|
|
||||||
(long) 512, (short) 1, 0L);
|
|
||||||
locations1 = fs.getFileBlockLocations(testFile, 0, 1024);
|
|
||||||
locations11 = fs.getFileBlockLocations(testFile, 1024, 2048);
|
|
||||||
Assert.assertNotNull(locations1);
|
|
||||||
Assert.assertNotNull(locations11);
|
|
||||||
}
|
|
||||||
|
|
||||||
try (FileSystem fs = getHttpFSFileSystem()) {
|
|
||||||
locations2 = fs.getFileBlockLocations(testFile, 0, 1024);
|
|
||||||
locations21 = fs.getFileBlockLocations(testFile, 1024, 2048);
|
|
||||||
Assert.assertNotNull(locations2);
|
|
||||||
Assert.assertNotNull(locations21);
|
|
||||||
}
|
|
||||||
|
|
||||||
verifyBlockLocations(locations1, locations2);
|
|
||||||
verifyBlockLocations(locations11, locations21);
|
|
||||||
|
|
||||||
// Test multi-block multi-replica file block locations.
|
|
||||||
try (FileSystem fs = FileSystem.get(getProxiedFSConf())) {
|
|
||||||
testFile = new Path(getProxiedFSTestDir(), "multipleBlocks.txt");
|
|
||||||
DFSTestUtil.createFile(fs, testFile, 512, (short) 2048,
|
|
||||||
(long) 512, (short) 3, 0L);
|
|
||||||
locations1 = fs.getFileBlockLocations(testFile, 0, 2048);
|
|
||||||
Assert.assertNotNull(locations1);
|
|
||||||
}
|
|
||||||
|
|
||||||
try (FileSystem fs = getHttpFSFileSystem()) {
|
|
||||||
locations2 = fs.getFileBlockLocations(testFile, 0, 2048);
|
|
||||||
Assert.assertNotNull(locations2);
|
|
||||||
}
|
|
||||||
|
|
||||||
verifyBlockLocations(locations1, locations2);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void verifyBlockLocations(BlockLocation[] locations1,
|
|
||||||
BlockLocation[] locations2) throws IOException {
|
|
||||||
Assert.assertEquals(locations1.length, locations2.length);
|
|
||||||
for (int i = 0; i < locations1.length; i++) {
|
|
||||||
BlockLocation location1 = locations1[i];
|
|
||||||
BlockLocation location2 = locations2[i];
|
|
||||||
|
|
||||||
Assert.assertEquals(location1.isCorrupt(), location2.isCorrupt());
|
|
||||||
Assert.assertEquals(location1.getOffset(), location2.getOffset());
|
|
||||||
Assert.assertEquals(location1.getLength(), location2.getLength());
|
|
||||||
|
|
||||||
Arrays.sort(location1.getHosts());
|
|
||||||
Arrays.sort(location2.getHosts());
|
|
||||||
Arrays.sort(location1.getNames());
|
|
||||||
Arrays.sort(location2.getNames());
|
|
||||||
Arrays.sort(location1.getTopologyPaths());
|
|
||||||
Arrays.sort(location2.getTopologyPaths());
|
|
||||||
|
|
||||||
Assert.assertArrayEquals(location1.getHosts(), location2.getHosts());
|
|
||||||
Assert.assertArrayEquals(location1.getNames(), location2.getNames());
|
|
||||||
Assert.assertArrayEquals(location1.getTopologyPaths(),
|
|
||||||
location2.getTopologyPaths());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void testCreateSnapshot(String snapshotName) throws Exception {
|
private void testCreateSnapshot(String snapshotName) throws Exception {
|
||||||
if (!this.isLocalFS()) {
|
if (!this.isLocalFS()) {
|
||||||
Path snapshottablePath = new Path("/tmp/tmp-snap-test");
|
Path snapshottablePath = new Path("/tmp/tmp-snap-test");
|
||||||
@ -1363,5 +1277,4 @@ private void testDeleteSnapshot() throws Exception {
|
|||||||
fs.delete(snapshottablePath, true);
|
fs.delete(snapshottablePath, true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user