From 9a8aff69ff70555eab7c71e93697005487e9a164 Mon Sep 17 00:00:00 2001 From: Ashutosh Gupta Date: Mon, 17 Oct 2022 11:56:15 +0100 Subject: [PATCH] HDFS-6874. Add GETFILEBLOCKLOCATIONS operation to HttpFS (#4750) Co-authored-by: Ashutosh Gupta --- .../hadoop/hdfs/DistributedFileSystem.java | 33 +++++++++ .../hadoop/hdfs/web/JsonUtilClient.java | 52 +++++++++++++ .../hadoop/hdfs/web/WebHdfsFileSystem.java | 49 +++++++++++-- .../hadoop-hdfs-httpfs/pom.xml | 10 +++ .../fs/http/client/HttpFSFileSystem.java | 49 ++++++++++++- .../hadoop/fs/http/server/FSOperations.java | 73 +++++++++++++++++++ .../http/server/HttpFSParametersProvider.java | 4 +- .../hadoop/fs/http/server/HttpFSServer.java | 38 +++++++++- .../fs/http/client/BaseTestHttpFSWith.java | 45 +++++++++++- .../fs/http/server/TestHttpFSServer.java | 37 ++++++++++ .../org/apache/hadoop/hdfs/web/JsonUtil.java | 26 ++++++- .../hadoop/hdfs/web/TestJsonUtilClient.java | 68 +++++++++++++++++ 12 files changed, 466 insertions(+), 18 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestJsonUtilClient.java diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java index 93db332d73..2d8953e98b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.util.Preconditions; @@ -3898,4 +3899,36 @@ public DatanodeInfo[] getSlowDatanodeStats() throws IOException { return dfs.slowDatanodeReport(); } + /** + * Returns LocatedBlocks of the corresponding HDFS file p from offset start + * for length len. + * This is similar to {@link #getFileBlockLocations(Path, long, long)} except + * that it returns LocatedBlocks rather than BlockLocation array. + * @param p path representing the file of interest. + * @param start offset + * @param len length + * @return a LocatedBlocks object + * @throws IOException + */ + public LocatedBlocks getLocatedBlocks(Path p, long start, long len) + throws IOException { + final Path absF = fixRelativePart(p); + return new FileSystemLinkResolver() { + @Override + public LocatedBlocks doCall(final Path p) throws IOException { + return dfs.getLocatedBlocks(getPathName(p), start, len); + } + @Override + public LocatedBlocks next(final FileSystem fs, final Path p) + throws IOException { + if (fs instanceof DistributedFileSystem) { + DistributedFileSystem myDfs = (DistributedFileSystem)fs; + return myDfs.getLocatedBlocks(p, start, len); + } + throw new UnsupportedOperationException("Cannot getLocatedBlocks " + + "through a symlink to a non-DistributedFileSystem: " + fs + " -> "+ + p); + } + }.resolve(this, absF); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/JsonUtilClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/JsonUtilClient.java index 75163c16d7..c3dd556ba5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/JsonUtilClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/JsonUtilClient.java @@ -19,6 +19,9 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectReader; + +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.util.Preconditions; import org.apache.hadoop.thirdparty.com.google.common.collect.Maps; import org.apache.hadoop.fs.ContentSummary; @@ -965,4 +968,53 @@ private static SnapshotStatus toSnapshotStatus( SnapshotStatus.getParentPath(fullPath))); return snapshotStatus; } + + @VisibleForTesting + public static BlockLocation[] toBlockLocationArray(Map json) + throws IOException { + final Map rootmap = + (Map) json.get(BlockLocation.class.getSimpleName() + "s"); + final List array = + JsonUtilClient.getList(rootmap, BlockLocation.class.getSimpleName()); + Preconditions.checkNotNull(array); + final BlockLocation[] locations = new BlockLocation[array.size()]; + int i = 0; + for (Object object : array) { + final Map m = (Map) object; + locations[i++] = JsonUtilClient.toBlockLocation(m); + } + return locations; + } + + /** Convert a Json map to BlockLocation. **/ + private static BlockLocation toBlockLocation(Map m) throws IOException { + if (m == null) { + return null; + } + long length = ((Number) m.get("length")).longValue(); + long offset = ((Number) m.get("offset")).longValue(); + boolean corrupt = Boolean.getBoolean(m.get("corrupt").toString()); + String[] storageIds = toStringArray(getList(m, "storageIds")); + String[] cachedHosts = toStringArray(getList(m, "cachedHosts")); + String[] hosts = toStringArray(getList(m, "hosts")); + String[] names = toStringArray(getList(m, "names")); + String[] topologyPaths = toStringArray(getList(m, "topologyPaths")); + StorageType[] storageTypes = toStorageTypeArray(getList(m, "storageTypes")); + return new BlockLocation(names, hosts, cachedHosts, topologyPaths, + storageIds, storageTypes, offset, length, corrupt); + } + + @VisibleForTesting + static String[] toStringArray(List list) { + if (list == null) { + return null; + } else { + final String[] array = new String[list.size()]; + int i = 0; + for (Object object : list) { + array[i++] = object.toString(); + } + return array; + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java index 5afb526675..f0774e98d1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java @@ -183,6 +183,8 @@ public class WebHdfsFileSystem extends FileSystem private KeyProvider testProvider; private boolean isTLSKrb; + private boolean isServerHCFSCompatible = true; + /** * Return the protocol scheme for the FileSystem. * @@ -1882,18 +1884,51 @@ public BlockLocation[] getFileBlockLocations(final FileStatus status, } @Override - public BlockLocation[] getFileBlockLocations(final Path p, - final long offset, final long length) throws IOException { + public BlockLocation[] getFileBlockLocations(final Path p, final long offset, + final long length) throws IOException { statistics.incrementReadOps(1); storageStatistics.incrementOpCounter(OpType.GET_FILE_BLOCK_LOCATIONS); + BlockLocation[] locations; + try { + if (isServerHCFSCompatible) { + locations = getFileBlockLocations(GetOpParam.Op.GETFILEBLOCKLOCATIONS, p, offset, length); + } else { + locations = getFileBlockLocations(GetOpParam.Op.GET_BLOCK_LOCATIONS, p, offset, length); + } + } catch (RemoteException e) { + // parsing the exception is needed only if the client thinks the service is compatible + if (isServerHCFSCompatible && isGetFileBlockLocationsException(e)) { + LOG.warn("Server does not appear to support GETFILEBLOCKLOCATIONS." + + "Fallback to the old GET_BLOCK_LOCATIONS. Exception: {}", + e.getMessage()); + isServerHCFSCompatible = false; + locations = getFileBlockLocations(GetOpParam.Op.GET_BLOCK_LOCATIONS, p, offset, length); + } else { + throw e; + } + } + return locations; + } - final HttpOpParam.Op op = GetOpParam.Op.GET_BLOCK_LOCATIONS; - return new FsPathResponseRunner(op, p, + private boolean isGetFileBlockLocationsException(RemoteException e) { + return e.getMessage() != null && e.getMessage().contains("Invalid value for webhdfs parameter") + && e.getMessage().contains(GetOpParam.Op.GETFILEBLOCKLOCATIONS.toString()); + } + + private BlockLocation[] getFileBlockLocations(final GetOpParam.Op operation, + final Path p, final long offset, final long length) throws IOException { + return new FsPathResponseRunner(operation, p, new OffsetParam(offset), new LengthParam(length)) { @Override - BlockLocation[] decodeResponse(Map json) throws IOException { - return DFSUtilClient.locatedBlocks2Locations( - JsonUtilClient.toLocatedBlocks(json)); + BlockLocation[] decodeResponse(Map json) throws IOException { + switch (operation) { + case GETFILEBLOCKLOCATIONS: + return JsonUtilClient.toBlockLocationArray(json); + case GET_BLOCK_LOCATIONS: + return DFSUtilClient.locatedBlocks2Locations(JsonUtilClient.toLocatedBlocks(json)); + default: + throw new IOException("Unknown operation " + operation.name()); + } } }.run(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-httpfs/pom.xml b/hadoop-hdfs-project/hadoop-hdfs-httpfs/pom.xml index a1b3ab1f92..b471fd062d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-httpfs/pom.xml +++ b/hadoop-hdfs-project/hadoop-hdfs-httpfs/pom.xml @@ -199,6 +199,16 @@ bcprov-jdk15on test + + com.squareup.okhttp3 + mockwebserver + test + + + com.squareup.okhttp3 + okhttp + test + diff --git a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/HttpFSFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/HttpFSFileSystem.java index 10dc787fa1..f34a27e027 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/HttpFSFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/HttpFSFileSystem.java @@ -24,8 +24,12 @@ import java.util.List; import org.apache.hadoop.thirdparty.com.google.common.base.Charsets; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.type.MapType; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.CommonPathCapabilities; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.DelegationTokenRenewer; @@ -140,6 +144,8 @@ public class HttpFSFileSystem extends FileSystem public static final String SNAPSHOT_DIFF_INDEX = "snapshotdiffindex"; public static final String FSACTION_MODE_PARAM = "fsaction"; public static final String EC_POLICY_NAME_PARAM = "ecpolicy"; + public static final String OFFSET_PARAM = "offset"; + public static final String LENGTH_PARAM = "length"; public static final Short DEFAULT_PERMISSION = 0755; public static final String ACLSPEC_DEFAULT = ""; @@ -239,6 +245,7 @@ public static FILE_TYPE getType(FileStatus fileStatus) { public static final String STORAGE_POLICIES_JSON = "BlockStoragePolicies"; public static final String STORAGE_POLICY_JSON = "BlockStoragePolicy"; + public static final String BLOCK_LOCATIONS_JSON = "BlockLocations"; public static final int HTTP_TEMPORARY_REDIRECT = 307; @@ -269,7 +276,8 @@ public enum Operation { GETSNAPSHOTTABLEDIRECTORYLIST(HTTP_GET), GETSNAPSHOTLIST(HTTP_GET), GETSERVERDEFAULTS(HTTP_GET), CHECKACCESS(HTTP_GET), SETECPOLICY(HTTP_PUT), GETECPOLICY(HTTP_GET), UNSETECPOLICY( - HTTP_POST), SATISFYSTORAGEPOLICY(HTTP_PUT), GETSNAPSHOTDIFFLISTING(HTTP_GET); + HTTP_POST), SATISFYSTORAGEPOLICY(HTTP_PUT), GETSNAPSHOTDIFFLISTING(HTTP_GET), + GET_BLOCK_LOCATIONS(HTTP_GET); private String httpMethod; @@ -1710,4 +1718,41 @@ public void satisfyStoragePolicy(final Path path) throws IOException { Operation.SATISFYSTORAGEPOLICY.getMethod(), params, path, true); HttpExceptionUtils.validateResponse(conn, HttpURLConnection.HTTP_OK); } -} + + @Override + public BlockLocation[] getFileBlockLocations(Path path, long start, long len) + throws IOException { + Map params = new HashMap<>(); + params.put(OP_PARAM, Operation.GETFILEBLOCKLOCATIONS.toString()); + params.put(OFFSET_PARAM, Long.toString(start)); + params.put(LENGTH_PARAM, Long.toString(len)); + HttpURLConnection conn = getConnection( + Operation.GETFILEBLOCKLOCATIONS.getMethod(), params, path, true); + HttpExceptionUtils.validateResponse(conn, HttpURLConnection.HTTP_OK); + JSONObject json = (JSONObject) HttpFSUtils.jsonParse(conn); + return toBlockLocations(json); + } + + @Override + public BlockLocation[] getFileBlockLocations(final FileStatus status, + final long offset, final long length) throws IOException { + if (status == null) { + return null; + } + return getFileBlockLocations(status.getPath(), offset, length); + } + + @VisibleForTesting + static BlockLocation[] toBlockLocations(JSONObject json) throws IOException { + ObjectMapper mapper = new ObjectMapper(); + MapType subType = mapper.getTypeFactory().constructMapType(Map.class, + String.class, BlockLocation[].class); + MapType rootType = mapper.getTypeFactory().constructMapType(Map.class, + mapper.constructType(String.class), mapper.constructType(subType)); + + Map> jsonMap = + mapper.readValue(json.toJSONString(), rootType); + Map locationMap = jsonMap.get(BLOCK_LOCATIONS_JSON); + return locationMap.get(BlockLocation.class.getSimpleName()); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/FSOperations.java b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/FSOperations.java index 33f2abbb31..9f70351dfc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/FSOperations.java +++ b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/FSOperations.java @@ -19,6 +19,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.BlockStoragePolicySpi; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.FileChecksum; @@ -44,6 +45,7 @@ import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; import org.apache.hadoop.hdfs.protocol.SnapshotDiffReportListing; import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus; @@ -2192,4 +2194,75 @@ public Void execute(FileSystem fs) throws IOException { return null; } } + + /** + * Executor that performs a getFileBlockLocations operation. + */ + + @InterfaceAudience.Private + @SuppressWarnings("rawtypes") + public static class FSFileBlockLocations implements FileSystemAccess.FileSystemExecutor { + final private Path path; + final private long offsetValue; + final private long lengthValue; + + /** + * Creates a file-block-locations executor. + * + * @param path the path to retrieve the location + * @param offsetValue offset into the given file + * @param lengthValue length for which to get locations for + */ + public FSFileBlockLocations(String path, long offsetValue, long lengthValue) { + this.path = new Path(path); + this.offsetValue = offsetValue; + this.lengthValue = lengthValue; + } + + @Override + public Map execute(FileSystem fs) throws IOException { + BlockLocation[] locations = fs.getFileBlockLocations(this.path, + this.offsetValue, this.lengthValue); + return JsonUtil.toJsonMap(locations); + } + } + + /** + * Executor that performs a getFileBlockLocations operation for legacy + * clients that supports only GET_BLOCK_LOCATIONS. + */ + + @InterfaceAudience.Private + @SuppressWarnings("rawtypes") + public static class FSFileBlockLocationsLegacy + implements FileSystemAccess.FileSystemExecutor { + final private Path path; + final private long offsetValue; + final private long lengthValue; + + /** + * Creates a file-block-locations executor. + * + * @param path the path to retrieve the location + * @param offsetValue offset into the given file + * @param lengthValue length for which to get locations for + */ + public FSFileBlockLocationsLegacy(String path, long offsetValue, long lengthValue) { + this.path = new Path(path); + this.offsetValue = offsetValue; + this.lengthValue = lengthValue; + } + + @Override + public Map execute(FileSystem fs) throws IOException { + if (fs instanceof DistributedFileSystem) { + DistributedFileSystem dfs = (DistributedFileSystem)fs; + LocatedBlocks locations = dfs.getLocatedBlocks( + this.path, this.offsetValue, this.lengthValue); + return JsonUtil.toJsonMap(locations); + } + throw new IOException("Unable to support FSFileBlockLocationsLegacy " + + "because the file system is not DistributedFileSystem."); + } + } } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/HttpFSParametersProvider.java b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/HttpFSParametersProvider.java index 41009c7b51..6943636f8f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/HttpFSParametersProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/HttpFSParametersProvider.java @@ -60,7 +60,8 @@ public class HttpFSParametersProvider extends ParametersProvider { PARAMS_DEF.put(Operation.GETQUOTAUSAGE, new Class[]{}); PARAMS_DEF.put(Operation.GETFILECHECKSUM, new Class[]{NoRedirectParam.class}); - PARAMS_DEF.put(Operation.GETFILEBLOCKLOCATIONS, new Class[]{}); + PARAMS_DEF.put(Operation.GETFILEBLOCKLOCATIONS, + new Class[] {OffsetParam.class, LenParam.class}); PARAMS_DEF.put(Operation.GETACLSTATUS, new Class[]{}); PARAMS_DEF.put(Operation.GETTRASHROOT, new Class[]{}); PARAMS_DEF.put(Operation.INSTRUMENTATION, new Class[]{}); @@ -127,6 +128,7 @@ public class HttpFSParametersProvider extends ParametersProvider { PARAMS_DEF.put(Operation.GETECPOLICY, new Class[] {}); PARAMS_DEF.put(Operation.UNSETECPOLICY, new Class[] {}); PARAMS_DEF.put(Operation.SATISFYSTORAGEPOLICY, new Class[] {}); + PARAMS_DEF.put(Operation.GET_BLOCK_LOCATIONS, new Class[] {OffsetParam.class, LenParam.class}); } public HttpFSParametersProvider() { diff --git a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/HttpFSServer.java b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/HttpFSServer.java index 399ff3bde9..b50d24900a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/HttpFSServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/HttpFSServer.java @@ -370,7 +370,23 @@ public InputStream run() throws Exception { break; } case GETFILEBLOCKLOCATIONS: { - response = Response.status(Response.Status.BAD_REQUEST).build(); + long offset = 0; + long len = Long.MAX_VALUE; + Long offsetParam = params.get(OffsetParam.NAME, OffsetParam.class); + Long lenParam = params.get(LenParam.NAME, LenParam.class); + AUDIT_LOG.info("[{}] offset [{}] len [{}]", path, offsetParam, lenParam); + if (offsetParam != null && offsetParam > 0) { + offset = offsetParam; + } + if (lenParam != null && lenParam > 0) { + len = lenParam; + } + FSOperations.FSFileBlockLocations command = + new FSOperations.FSFileBlockLocations(path, offset, len); + @SuppressWarnings("rawtypes") + Map locations = fsExecute(user, command); + final String json = JsonUtil.toJsonString("BlockLocations", locations); + response = Response.ok(json).type(MediaType.APPLICATION_JSON).build(); break; } case GETACLSTATUS: { @@ -510,6 +526,26 @@ public InputStream run() throws Exception { response = Response.ok(js).type(MediaType.APPLICATION_JSON).build(); break; } + case GET_BLOCK_LOCATIONS: { + long offset = 0; + long len = Long.MAX_VALUE; + Long offsetParam = params.get(OffsetParam.NAME, OffsetParam.class); + Long lenParam = params.get(LenParam.NAME, LenParam.class); + AUDIT_LOG.info("[{}] offset [{}] len [{}]", path, offsetParam, lenParam); + if (offsetParam != null && offsetParam > 0) { + offset = offsetParam; + } + if (lenParam != null && lenParam > 0) { + len = lenParam; + } + FSOperations.FSFileBlockLocationsLegacy command = + new FSOperations.FSFileBlockLocationsLegacy(path, offset, len); + @SuppressWarnings("rawtypes") + Map locations = fsExecute(user, command); + final String json = JsonUtil.toJsonString("LocatedBlocks", locations); + response = Response.ok(json).type(MediaType.APPLICATION_JSON).build(); + break; + } default: { throw new IOException( MessageFormat.format("Invalid HTTP GET operation [{0}]", op.value())); diff --git a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/fs/http/client/BaseTestHttpFSWith.java b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/fs/http/client/BaseTestHttpFSWith.java index 9ef24ec734..41dc03d59e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/fs/http/client/BaseTestHttpFSWith.java +++ b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/fs/http/client/BaseTestHttpFSWith.java @@ -19,6 +19,7 @@ package org.apache.hadoop.fs.http.client; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.BlockStoragePolicySpi; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.ContentSummary; @@ -73,6 +74,10 @@ import org.apache.hadoop.test.TestJetty; import org.apache.hadoop.test.TestJettyHelper; import org.apache.hadoop.util.Lists; + +import org.json.simple.JSONObject; +import org.json.simple.parser.ContainerFactory; +import org.json.simple.parser.JSONParser; import org.junit.Assert; import org.junit.Assume; import org.junit.Test; @@ -101,11 +106,11 @@ import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @RunWith(value = Parameterized.class) public abstract class BaseTestHttpFSWith extends HFSTestCase { - protected abstract Path getProxiedFSTestDir(); protected abstract String getProxiedFSURI(); @@ -191,7 +196,7 @@ protected FileSystem getHttpFSFileSystem() throws Exception { protected void testGet() throws Exception { FileSystem fs = getHttpFSFileSystem(); - Assert.assertNotNull(fs); + assertNotNull(fs); URI uri = new URI(getScheme() + "://" + TestJettyHelper.getJettyURL().toURI().getAuthority()); assertEquals(fs.getUri(), uri); @@ -1201,7 +1206,7 @@ protected enum Operation { ALLOW_SNAPSHOT, DISALLOW_SNAPSHOT, DISALLOW_SNAPSHOT_EXCEPTION, FILE_STATUS_ATTR, GET_SNAPSHOT_DIFF, GET_SNAPSHOTTABLE_DIRECTORY_LIST, GET_SNAPSHOT_LIST, GET_SERVERDEFAULTS, CHECKACCESS, SETECPOLICY, - SATISFYSTORAGEPOLICY, GET_SNAPSHOT_DIFF_LISTING + SATISFYSTORAGEPOLICY, GET_SNAPSHOT_DIFF_LISTING, GETFILEBLOCKLOCATIONS } private void operation(Operation op) throws Exception { @@ -1341,6 +1346,9 @@ private void operation(Operation op) throws Exception { case GET_SNAPSHOT_DIFF_LISTING: testGetSnapshotDiffListing(); break; + case GETFILEBLOCKLOCATIONS: + testGetFileBlockLocations(); + break; } } @@ -1959,6 +1967,37 @@ public void testStoragePolicySatisfier() throws Exception { } } + private void testGetFileBlockLocations() throws Exception { + BlockLocation[] blockLocations; + Path testFile; + if (!this.isLocalFS()) { + FileSystem fs = this.getHttpFSFileSystem(); + testFile = new Path(getProxiedFSTestDir(), "singleBlock.txt"); + DFSTestUtil.createFile(fs, testFile, 1, (short) 1, 0L); + if (fs instanceof HttpFSFileSystem) { + HttpFSFileSystem httpFS = (HttpFSFileSystem) fs; + blockLocations = httpFS.getFileBlockLocations(testFile, 0, 1); + assertNotNull(blockLocations); + + // verify HttpFSFileSystem.toBlockLocations() + String jsonString = JsonUtil.toJsonString(blockLocations); + JSONParser parser = new JSONParser(); + JSONObject jsonObject = (JSONObject) parser.parse(jsonString, (ContainerFactory) null); + BlockLocation[] deserializedLocation = HttpFSFileSystem.toBlockLocations(jsonObject); + assertEquals(blockLocations.length, deserializedLocation.length); + for (int i = 0; i < blockLocations.length; i++) { + assertEquals(blockLocations[i].toString(), deserializedLocation[i].toString()); + } + } else if (fs instanceof WebHdfsFileSystem) { + WebHdfsFileSystem webHdfsFileSystem = (WebHdfsFileSystem) fs; + blockLocations = webHdfsFileSystem.getFileBlockLocations(testFile, 0, 1); + assertNotNull(blockLocations); + } else { + Assert.fail(fs.getClass().getSimpleName() + " doesn't support access"); + } + } + } + private void testGetSnapshotDiffListing() throws Exception { if (!this.isLocalFS()) { // Create a directory with snapshot allowed diff --git a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/fs/http/server/TestHttpFSServer.java b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/fs/http/server/TestHttpFSServer.java index f584b56ebb..89efdb24ed 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/fs/http/server/TestHttpFSServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/fs/http/server/TestHttpFSServer.java @@ -36,12 +36,14 @@ import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.web.JsonUtil; +import org.apache.hadoop.hdfs.web.JsonUtilClient; import org.apache.hadoop.lib.service.FileSystemAccess; import org.apache.hadoop.security.authentication.util.SignerSecretProvider; import org.apache.hadoop.security.authentication.util.StringSignerSecretProviderCreator; import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier; import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticator; import org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticationHandler; +import org.apache.hadoop.util.JsonSerialization; import org.json.simple.JSONArray; import org.junit.Assert; @@ -70,6 +72,7 @@ import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FsServerDefaults; import org.apache.hadoop.fs.Path; @@ -2003,4 +2006,38 @@ public void testContentType() throws Exception { () -> HttpFSUtils.jsonParse(conn)); conn.disconnect(); } + + @Test + @TestDir + @TestJetty + @TestHdfs + public void testGetFileBlockLocations() throws Exception { + createHttpFSServer(false, false); + // Create a test directory + String pathStr = "/tmp/tmp-get-block-location-test"; + createDirWithHttp(pathStr, "700", null); + + Path path = new Path(pathStr); + DistributedFileSystem dfs = (DistributedFileSystem) FileSystem + .get(path.toUri(), TestHdfsHelper.getHdfsConf()); + + String file1 = pathStr + "/file1"; + createWithHttp(file1, null); + HttpURLConnection conn = sendRequestToHttpFSServer(file1, + "GETFILEBLOCKLOCATIONS", "length=10&offset10"); + Assert.assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode()); + BlockLocation[] locations1 = dfs.getFileBlockLocations(new Path(file1), 0, 1); + Assert.assertNotNull(locations1); + + Map jsonMap = JsonSerialization.mapReader().readValue(conn.getInputStream()); + + BlockLocation[] httpfsBlockLocations = JsonUtilClient.toBlockLocationArray(jsonMap); + + assertEquals(locations1.length, httpfsBlockLocations.length); + for (int i = 0; i < locations1.length; i++) { + assertEquals(locations1[i].toString(), httpfsBlockLocations[i].toString()); + } + + conn.getInputStream().close(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java index 1744b3dde0..b91399e399 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdfs.web; +import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.FileChecksum; @@ -335,6 +336,17 @@ public static String toJsonString(final LocatedBlocks locatedblocks return null; } + final Map m = toJsonMap(locatedblocks); + return toJsonString(LocatedBlocks.class, m); + } + + /** Convert LocatedBlocks to a Map. */ + public static Map toJsonMap(final LocatedBlocks locatedblocks) + throws IOException { + if (locatedblocks == null) { + return null; + } + final Map m = new TreeMap(); m.put("fileLength", locatedblocks.getFileLength()); m.put("isUnderConstruction", locatedblocks.isUnderConstruction()); @@ -342,7 +354,7 @@ public static String toJsonString(final LocatedBlocks locatedblocks m.put("locatedBlocks", toJsonArray(locatedblocks.getLocatedBlocks())); m.put("lastLocatedBlock", toJsonMap(locatedblocks.getLastLocatedBlock())); m.put("isLastBlockComplete", locatedblocks.isLastBlockComplete()); - return toJsonString(LocatedBlocks.class, m); + return m; } /** Convert a ContentSummary to a Json string. */ @@ -676,7 +688,8 @@ private static Object toJsonMap( return m; } - private static Map toJsonMap( + @VisibleForTesting + static Map toJsonMap( final BlockLocation blockLocation) throws IOException { if (blockLocation == null) { return null; @@ -696,15 +709,20 @@ private static Map toJsonMap( public static String toJsonString(BlockLocation[] locations) throws IOException { + return toJsonString("BlockLocations", JsonUtil.toJsonMap(locations)); + } + + public static Map toJsonMap(BlockLocation[] locations) + throws IOException { if (locations == null) { return null; } final Map m = new HashMap<>(); Object[] blockLocations = new Object[locations.length]; - for(int i=0; i strList = new ArrayList(Arrays.asList("aaa", "bbb", "ccc")); + + String[] strArr = JsonUtilClient.toStringArray(strList); + assertEquals("Expected 3 items in the array", 3, strArr.length); + assertEquals("aaa", strArr[0]); + assertEquals("bbb", strArr[1]); + assertEquals("ccc", strArr[2]); + } + + @Test + public void testToBlockLocationArray() throws Exception { + BlockLocation blockLocation = new BlockLocation( + new String[] {"127.0.0.1:62870"}, + new String[] {"127.0.0.1"}, + null, + new String[] {"/default-rack/127.0.0.1:62870"}, + null, + new StorageType[] {StorageType.DISK}, + 0, 1, false); + + Map blockLocationsMap = + JsonUtil.toJsonMap(new BlockLocation[] {blockLocation}); + String json = JsonUtil.toJsonString("BlockLocations", blockLocationsMap); + assertNotNull(json); + Map jsonMap = JsonSerialization.mapReader().readValue(json); + + BlockLocation[] deserializedBlockLocations = + JsonUtilClient.toBlockLocationArray(jsonMap); + assertEquals(1, deserializedBlockLocations.length); + assertEquals(blockLocation.toString(), + deserializedBlockLocations[0].toString()); + } +}