From 8a40953058d50d421d62b71067a13b626b3cba1f Mon Sep 17 00:00:00 2001 From: Andrew Wang Date: Fri, 16 Sep 2016 15:37:36 -0700 Subject: [PATCH] HDFS-10823. Implement HttpFSFileSystem#listStatusIterator. --- .../java/org/apache/hadoop/fs/FileSystem.java | 125 +++++++++++++++--- .../hadoop/fs/TestFilterFileSystem.java | 1 + .../apache/hadoop/fs/TestHarFileSystem.java | 1 + .../hadoop/hdfs/web/WebHdfsFileSystem.java | 70 +++------- .../fs/http/client/HttpFSFileSystem.java | 56 ++++++-- .../hadoop/fs/http/client/HttpFSUtils.java | 2 + .../hadoop/fs/http/server/FSOperations.java | 62 +++++++++ .../http/server/HttpFSParametersProvider.java | 20 +++ .../hadoop/fs/http/server/HttpFSServer.java | 17 +++ .../hadoop/FileSystemAccessService.java | 4 +- .../fs/http/client/BaseTestHttpFSWith.java | 62 ++++++++- 11 files changed, 341 insertions(+), 79 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java index 9bde29da3d..5939f974a1 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java @@ -73,6 +73,7 @@ import org.apache.htrace.core.Tracer; import org.apache.htrace.core.TraceScope; +import com.google.common.base.Preconditions; import com.google.common.annotations.VisibleForTesting; import static com.google.common.base.Preconditions.checkArgument; @@ -1530,7 +1531,68 @@ public boolean accept(Path file) { */ public abstract FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException; - + + /** + * Represents a batch of directory entries when iteratively listing a + * directory. This is a private API not meant for use by end users. + *

+ * For internal use by FileSystem subclasses that override + * {@link FileSystem#listStatusBatch(Path, byte[])} to implement iterative + * listing. + */ + @InterfaceAudience.Private + public static class DirectoryEntries { + private final FileStatus[] entries; + private final byte[] token; + private final boolean hasMore; + + public DirectoryEntries(FileStatus[] entries, byte[] token, boolean + hasMore) { + this.entries = entries; + if (token != null) { + this.token = token.clone(); + } else { + this.token = null; + } + this.hasMore = hasMore; + } + + public FileStatus[] getEntries() { + return entries; + } + + public byte[] getToken() { + return token; + } + + public boolean hasMore() { + return hasMore; + } + } + + /** + * Given an opaque iteration token, return the next batch of entries in a + * directory. This is a private API not meant for use by end users. + *

+ * This method should be overridden by FileSystem subclasses that want to + * use the generic {@link FileSystem#listStatusIterator(Path)} implementation. + * @param f Path to list + * @param token opaque iteration token returned by previous call, or null + * if this is the first call. + * @return + * @throws FileNotFoundException + * @throws IOException + */ + @InterfaceAudience.Private + protected DirectoryEntries listStatusBatch(Path f, byte[] token) throws + FileNotFoundException, IOException { + // The default implementation returns the entire listing as a single batch. + // Thus, there is never a second batch, and no need to respect the passed + // token or set a token in the returned DirectoryEntries. + FileStatus[] listing = listStatus(f); + return new DirectoryEntries(listing, null, false); + } + /* * Filter files/directories in the given path using the user-supplied path * filter. Results are added to the given array results. @@ -1766,6 +1828,49 @@ public LocatedFileStatus next() throws IOException { }; } + /** + * Generic iterator for implementing {@link #listStatusIterator(Path)}. + */ + private class DirListingIterator implements + RemoteIterator { + + private final Path path; + private DirectoryEntries entries; + private int i = 0; + + DirListingIterator(Path path) { + this.path = path; + } + + @Override + public boolean hasNext() throws IOException { + if (entries == null) { + fetchMore(); + } + return i < entries.getEntries().length || + entries.hasMore(); + } + + private void fetchMore() throws IOException { + byte[] token = null; + if (entries != null) { + token = entries.getToken(); + } + entries = listStatusBatch(path, token); + i = 0; + } + + @Override + @SuppressWarnings("unchecked") + public T next() throws IOException { + Preconditions.checkState(hasNext(), "No more items in iterator"); + if (i == entries.getEntries().length) { + fetchMore(); + } + return (T)entries.getEntries()[i++]; + } + } + /** * Returns a remote iterator so that followup calls are made on demand * while consuming the entries. Each file system implementation should @@ -1779,23 +1884,7 @@ public LocatedFileStatus next() throws IOException { */ public RemoteIterator listStatusIterator(final Path p) throws FileNotFoundException, IOException { - return new RemoteIterator() { - private final FileStatus[] stats = listStatus(p); - private int i = 0; - - @Override - public boolean hasNext() { - return i(p); } /** diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java index e1312bc00b..24f3dc8b7f 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java @@ -103,6 +103,7 @@ public FSDataOutputStream create(Path f, FsPermission permission, public void processDeleteOnExit(); public FsStatus getStatus(); public FileStatus[] listStatus(Path f, PathFilter filter); + public FileStatus[] listStatusBatch(Path f, byte[] token); public FileStatus[] listStatus(Path[] files); public FileStatus[] listStatus(Path[] files, PathFilter filter); public FileStatus[] globStatus(Path pathPattern); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java index d2020b9b72..bacdbb73e4 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java @@ -115,6 +115,7 @@ public FSDataOutputStream create(Path f, FsPermission permission, public QuotaUsage getQuotaUsage(Path f); public FsStatus getStatus(); public FileStatus[] listStatus(Path f, PathFilter filter); + public FileStatus[] listStatusBatch(Path f, byte[] token); public FileStatus[] listStatus(Path[] files); public FileStatus[] listStatus(Path[] files, PathFilter filter); public FileStatus[] globStatus(Path pathPattern); diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java index 9a9edc8462..19de5b5195 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java @@ -64,7 +64,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.GlobalStorageStatistics; import org.apache.hadoop.fs.GlobalStorageStatistics.StorageStatisticsProvider; -import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.StorageStatistics; import org.apache.hadoop.fs.permission.FsCreateModes; import org.apache.hadoop.hdfs.DFSOpsCountStatistics; @@ -1504,55 +1503,30 @@ FileStatus[] decodeResponse(Map json) { } private static final byte[] EMPTY_ARRAY = new byte[] {}; - private class DirListingIterator implements - RemoteIterator { - - private final Path path; - private DirectoryListing thisListing; - private int i = 0; - private byte[] prevKey = EMPTY_ARRAY; - - DirListingIterator(Path path) { - this.path = path; - } - - @Override - public boolean hasNext() throws IOException { - if (thisListing == null) { - fetchMore(); - } - return i < thisListing.getPartialListing().length || - thisListing.hasMore(); - } - - private void fetchMore() throws IOException { - thisListing = new FsPathResponseRunner( - GetOpParam.Op.LISTSTATUS_BATCH, - path, new StartAfterParam(new String(prevKey, Charsets.UTF_8))) { - @Override - DirectoryListing decodeResponse(Map json) throws IOException { - return JsonUtilClient.toDirectoryListing(json); - } - }.run(); - i = 0; - prevKey = thisListing.getLastName(); - } - - @Override - @SuppressWarnings("unchecked") - public T next() throws IOException { - Preconditions.checkState(hasNext(), "No more items in iterator"); - if (i == thisListing.getPartialListing().length) { - fetchMore(); - } - return (T)makeQualified(thisListing.getPartialListing()[i++], path); - } - } @Override - public RemoteIterator listStatusIterator(final Path f) - throws FileNotFoundException, IOException { - return new DirListingIterator<>(f); + public DirectoryEntries listStatusBatch(Path f, byte[] token) throws + FileNotFoundException, IOException { + byte[] prevKey = EMPTY_ARRAY; + if (token != null) { + prevKey = token; + } + DirectoryListing listing = new FsPathResponseRunner( + GetOpParam.Op.LISTSTATUS_BATCH, + f, new StartAfterParam(new String(prevKey, Charsets.UTF_8))) { + @Override + DirectoryListing decodeResponse(Map json) throws IOException { + return JsonUtilClient.toDirectoryListing(json); + } + }.run(); + // Qualify the returned FileStatus array + final HdfsFileStatus[] statuses = listing.getPartialListing(); + FileStatus[] qualified = new FileStatus[statuses.length]; + for (int i = 0; i < statuses.length; i++) { + qualified[i] = makeQualified(statuses[i], f); + } + return new DirectoryEntries(qualified, listing.getLastName(), + listing.hasMore()); } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/HttpFSFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/HttpFSFileSystem.java index 0f97d90ea2..77e332380b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/HttpFSFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/HttpFSFileSystem.java @@ -20,6 +20,8 @@ import java.util.ArrayList; import java.util.EnumSet; import java.util.List; + +import com.google.common.base.Charsets; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.ContentSummary; @@ -111,6 +113,7 @@ public class HttpFSFileSystem extends FileSystem public static final String XATTR_SET_FLAG_PARAM = "flag"; public static final String XATTR_ENCODING_PARAM = "encoding"; public static final String NEW_LENGTH_PARAM = "newlength"; + public static final String START_AFTER_PARAM = "startAfter"; public static final Short DEFAULT_PERMISSION = 0755; public static final String ACLSPEC_DEFAULT = ""; @@ -184,6 +187,10 @@ public static FILE_TYPE getType(FileStatus fileStatus) { public static final String ENC_BIT_JSON = "encBit"; + public static final String DIRECTORY_LISTING_JSON = "DirectoryListing"; + public static final String PARTIAL_LISTING_JSON = "partialListing"; + public static final String REMAINING_ENTRIES_JSON = "remainingEntries"; + public static final int HTTP_TEMPORARY_REDIRECT = 307; private static final String HTTP_GET = "GET"; @@ -203,7 +210,7 @@ public static enum Operation { MODIFYACLENTRIES(HTTP_PUT), REMOVEACLENTRIES(HTTP_PUT), REMOVEDEFAULTACL(HTTP_PUT), REMOVEACL(HTTP_PUT), SETACL(HTTP_PUT), DELETE(HTTP_DELETE), SETXATTR(HTTP_PUT), GETXATTRS(HTTP_GET), - REMOVEXATTR(HTTP_PUT), LISTXATTRS(HTTP_GET); + REMOVEXATTR(HTTP_PUT), LISTXATTRS(HTTP_GET), LISTSTATUS_BATCH(HTTP_GET); private String httpMethod; @@ -666,6 +673,17 @@ public boolean delete(Path f, boolean recursive) throws IOException { return (Boolean) json.get(DELETE_JSON); } + private FileStatus[] toFileStatuses(JSONObject json, Path f) { + json = (JSONObject) json.get(FILE_STATUSES_JSON); + JSONArray jsonArray = (JSONArray) json.get(FILE_STATUS_JSON); + FileStatus[] array = new FileStatus[jsonArray.size()]; + f = makeQualified(f); + for (int i = 0; i < jsonArray.size(); i++) { + array[i] = createFileStatus(f, (JSONObject) jsonArray.get(i)); + } + return array; + } + /** * List the statuses of the files/directories in the given path if the path is * a directory. @@ -684,14 +702,36 @@ public FileStatus[] listStatus(Path f) throws IOException { params, f, true); HttpExceptionUtils.validateResponse(conn, HttpURLConnection.HTTP_OK); JSONObject json = (JSONObject) HttpFSUtils.jsonParse(conn); - json = (JSONObject) json.get(FILE_STATUSES_JSON); - JSONArray jsonArray = (JSONArray) json.get(FILE_STATUS_JSON); - FileStatus[] array = new FileStatus[jsonArray.size()]; - f = makeQualified(f); - for (int i = 0; i < jsonArray.size(); i++) { - array[i] = createFileStatus(f, (JSONObject) jsonArray.get(i)); + return toFileStatuses(json, f); + } + + @Override + public DirectoryEntries listStatusBatch(Path f, byte[] token) throws + FileNotFoundException, IOException { + Map params = new HashMap(); + params.put(OP_PARAM, Operation.LISTSTATUS_BATCH.toString()); + if (token != null) { + params.put(START_AFTER_PARAM, new String(token, Charsets.UTF_8)); } - return array; + HttpURLConnection conn = getConnection( + Operation.LISTSTATUS_BATCH.getMethod(), + params, f, true); + HttpExceptionUtils.validateResponse(conn, HttpURLConnection.HTTP_OK); + // Parse the FileStatus array + JSONObject json = (JSONObject) HttpFSUtils.jsonParse(conn); + JSONObject listing = (JSONObject) json.get(DIRECTORY_LISTING_JSON); + FileStatus[] statuses = toFileStatuses( + (JSONObject) listing.get(PARTIAL_LISTING_JSON), f); + // New token is the last FileStatus entry + byte[] newToken = null; + if (statuses.length > 0) { + newToken = statuses[statuses.length - 1].getPath().getName().toString() + .getBytes(Charsets.UTF_8); + } + // Parse the remainingEntries boolean into hasMore + final long remainingEntries = (Long) listing.get(REMAINING_ENTRIES_JSON); + final boolean hasMore = remainingEntries > 0 ? true : false; + return new DirectoryEntries(statuses, newToken, hasMore); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/HttpFSUtils.java b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/HttpFSUtils.java index 95e26d799f..fcc7bab15e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/HttpFSUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/HttpFSUtils.java @@ -43,6 +43,8 @@ public class HttpFSUtils { public static final String SERVICE_VERSION = "/v1"; + public static final byte[] EMPTY_BYTES = {}; + private static final String SERVICE_PATH = SERVICE_NAME + SERVICE_VERSION; /** diff --git a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/FSOperations.java b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/FSOperations.java index 39597ebb12..46948f96b1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/FSOperations.java +++ b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/FSOperations.java @@ -22,6 +22,7 @@ import org.apache.hadoop.fs.FileChecksum; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FilterFileSystem; import org.apache.hadoop.fs.GlobFilter; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; @@ -37,6 +38,7 @@ import org.json.simple.JSONArray; import org.json.simple.JSONObject; +import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -108,6 +110,27 @@ private static Map toJsonInner(FileStatus fileStatus, return json; } + /** + * Serializes a DirectoryEntries object into the JSON for a + * WebHDFS {@link org.apache.hadoop.hdfs.protocol.DirectoryListing}. + *

+ * These two classes are slightly different, due to the impedance + * mismatches between the WebHDFS and FileSystem APIs. + * @param entries + * @return json + */ + private static Map toJson(FileSystem.DirectoryEntries + entries) { + Map json = new LinkedHashMap<>(); + Map inner = new LinkedHashMap<>(); + Map fileStatuses = toJson(entries.getEntries()); + inner.put(HttpFSFileSystem.PARTIAL_LISTING_JSON, fileStatuses); + inner.put(HttpFSFileSystem.REMAINING_ENTRIES_JSON, entries.hasMore() ? 1 + : 0); + json.put(HttpFSFileSystem.DIRECTORY_LISTING_JSON, inner); + return json; + } + /** Converts an AclStatus object into a JSON object. * * @param aclStatus AclStatus object @@ -624,6 +647,45 @@ public boolean accept(Path path) { } + /** + * Executor that performs a batched directory listing. + */ + @InterfaceAudience.Private + public static class FSListStatusBatch implements FileSystemAccess + .FileSystemExecutor { + private final Path path; + private final byte[] token; + + public FSListStatusBatch(String path, byte[] token) throws IOException { + this.path = new Path(path); + this.token = token.clone(); + } + + /** + * Simple wrapper filesystem that exposes the protected batched + * listStatus API so we can use it. + */ + private static class WrappedFileSystem extends FilterFileSystem { + public WrappedFileSystem(FileSystem f) { + super(f); + } + + @Override + public DirectoryEntries listStatusBatch(Path f, byte[] token) throws + FileNotFoundException, IOException { + return super.listStatusBatch(f, token); + } + } + + @Override + public Map execute(FileSystem fs) throws IOException { + WrappedFileSystem wrappedFS = new WrappedFileSystem(fs); + FileSystem.DirectoryEntries entries = + wrappedFS.listStatusBatch(path, token); + return toJson(entries); + } + } + /** * Executor that performs a mkdirs FileSystemAccess files system operation. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/HttpFSParametersProvider.java b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/HttpFSParametersProvider.java index 5c4204a237..25585c51fd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/HttpFSParametersProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/HttpFSParametersProvider.java @@ -91,6 +91,8 @@ public class HttpFSParametersProvider extends ParametersProvider { PARAMS_DEF.put(Operation.GETXATTRS, new Class[]{XAttrNameParam.class, XAttrEncodingParam.class}); PARAMS_DEF.put(Operation.LISTXATTRS, new Class[]{}); + PARAMS_DEF.put(Operation.LISTSTATUS_BATCH, + new Class[]{StartAfterParam.class}); } public HttpFSParametersProvider() { @@ -520,4 +522,22 @@ public XAttrEncodingParam() { super(NAME, XAttrCodec.class, null); } } + + /** + * Class for startafter parameter. + */ + @InterfaceAudience.Private + public static class StartAfterParam extends StringParam { + /** + * Parameter name. + */ + public static final String NAME = HttpFSFileSystem.START_AFTER_PARAM; + + /** + * Constructor. + */ + public StartAfterParam() { + super(NAME, null); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/HttpFSServer.java b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/HttpFSServer.java index db4692a367..a4db124c05 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/HttpFSServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/HttpFSServer.java @@ -18,12 +18,14 @@ package org.apache.hadoop.fs.http.server; +import com.google.common.base.Charsets; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.XAttrCodec; import org.apache.hadoop.fs.XAttrSetFlag; import org.apache.hadoop.fs.http.client.HttpFSFileSystem; +import org.apache.hadoop.fs.http.client.HttpFSUtils; import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.AccessTimeParam; import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.AclPermissionParam; import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.BlockSizeParam; @@ -320,6 +322,21 @@ public InputStream run() throws Exception { response = Response.ok(json).type(MediaType.APPLICATION_JSON).build(); break; } + case LISTSTATUS_BATCH: { + String startAfter = params.get( + HttpFSParametersProvider.StartAfterParam.NAME, + HttpFSParametersProvider.StartAfterParam.class); + byte[] token = HttpFSUtils.EMPTY_BYTES; + if (startAfter != null) { + token = startAfter.getBytes(Charsets.UTF_8); + } + FSOperations.FSListStatusBatch command = new FSOperations + .FSListStatusBatch(path, token); + @SuppressWarnings("rawtypes") Map json = fsExecute(user, command); + AUDIT_LOG.info("[{}] token [{}]", path, token); + response = Response.ok(json).type(MediaType.APPLICATION_JSON).build(); + break; + } default: { throw new IOException( MessageFormat.format("Invalid HTTP GET operation [{0}]", op.value())); diff --git a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/service/hadoop/FileSystemAccessService.java b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/service/hadoop/FileSystemAccessService.java index 88780cbeba..0b767bec6d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/service/hadoop/FileSystemAccessService.java +++ b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/service/hadoop/FileSystemAccessService.java @@ -84,7 +84,7 @@ public CachedFileSystem(long timeout) { count = 0; } - synchronized FileSystem getFileSytem(Configuration conf) + synchronized FileSystem getFileSystem(Configuration conf) throws IOException { if (fs == null) { fs = FileSystem.get(conf); @@ -290,7 +290,7 @@ protected FileSystem createFileSystem(Configuration namenodeConf) } Configuration conf = new Configuration(namenodeConf); conf.set(HTTPFS_FS_USER, user); - return cachedFS.getFileSytem(conf); + return cachedFS.getFileSystem(conf); } protected void closeFileSystem(FileSystem fs) throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/fs/http/client/BaseTestHttpFSWith.java b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/fs/http/client/BaseTestHttpFSWith.java index aea6cf80b5..e475803a25 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/fs/http/client/BaseTestHttpFSWith.java +++ b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/fs/http/client/BaseTestHttpFSWith.java @@ -26,6 +26,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystemTestHelper; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.http.server.HttpFSServerWebApp; import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclStatus; @@ -44,6 +45,7 @@ import org.apache.hadoop.test.TestJetty; import org.apache.hadoop.test.TestJettyHelper; import org.junit.Assert; +import org.junit.Assume; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -62,6 +64,7 @@ import java.net.URI; import java.net.URL; import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.List; @@ -136,14 +139,19 @@ protected String getScheme() { return "webhdfs"; } - protected FileSystem getHttpFSFileSystem() throws Exception { - Configuration conf = new Configuration(); + protected FileSystem getHttpFSFileSystem(Configuration conf) throws + Exception { conf.set("fs.webhdfs.impl", getFileSystemClass().getName()); URI uri = new URI(getScheme() + "://" + TestJettyHelper.getJettyURL().toURI().getAuthority()); return FileSystem.get(uri, conf); } + protected FileSystem getHttpFSFileSystem() throws Exception { + Configuration conf = new Configuration(); + return getHttpFSFileSystem(conf); + } + protected void testGet() throws Exception { FileSystem fs = getHttpFSFileSystem(); Assert.assertNotNull(fs); @@ -355,6 +363,51 @@ private void testListStatus() throws Exception { assertEquals(stati[0].getPath().getName(), path.getName()); } + private static void assertSameListing(FileSystem expected, FileSystem + actual, Path p) throws IOException { + // Consume all the entries from both iterators + RemoteIterator exIt = expected.listStatusIterator(p); + List exStatuses = new ArrayList<>(); + while (exIt.hasNext()) { + exStatuses.add(exIt.next()); + } + RemoteIterator acIt = actual.listStatusIterator(p); + List acStatuses = new ArrayList<>(); + while (acIt.hasNext()) { + acStatuses.add(acIt.next()); + } + assertEquals(exStatuses.size(), acStatuses.size()); + for (int i = 0; i < exStatuses.size(); i++) { + FileStatus expectedStatus = exStatuses.get(i); + FileStatus actualStatus = acStatuses.get(i); + // Path URIs are fully qualified, so compare just the path component + assertEquals(expectedStatus.getPath().toUri().getPath(), + actualStatus.getPath().toUri().getPath()); + } + } + + private void testListStatusBatch() throws Exception { + // LocalFileSystem writes checksum files next to the data files, which + // show up when listing via LFS. This makes the listings not compare + // properly. + Assume.assumeFalse(isLocalFS()); + + FileSystem proxyFs = FileSystem.get(getProxiedFSConf()); + Configuration conf = new Configuration(); + conf.setInt(DFSConfigKeys.DFS_LIST_LIMIT, 2); + FileSystem httpFs = getHttpFSFileSystem(conf); + + // Test an empty directory + Path dir = new Path(getProxiedFSTestDir(), "dir"); + proxyFs.mkdirs(dir); + assertSameListing(proxyFs, httpFs, dir); + // Create and test in a loop + for (int i = 0; i < 10; i++) { + proxyFs.create(new Path(dir, "file" + i)).close(); + assertSameListing(proxyFs, httpFs, dir); + } + } + private void testWorkingdirectory() throws Exception { FileSystem fs = FileSystem.get(getProxiedFSConf()); Path workingDir = fs.getWorkingDirectory(); @@ -863,7 +916,7 @@ protected enum Operation { GET, OPEN, CREATE, APPEND, TRUNCATE, CONCAT, RENAME, DELETE, LIST_STATUS, WORKING_DIRECTORY, MKDIRS, SET_TIMES, SET_PERMISSION, SET_OWNER, SET_REPLICATION, CHECKSUM, CONTENT_SUMMARY, FILEACLS, DIRACLS, SET_XATTR, - GET_XATTRS, REMOVE_XATTR, LIST_XATTRS, ENCRYPTION + GET_XATTRS, REMOVE_XATTR, LIST_XATTRS, ENCRYPTION, LIST_STATUS_BATCH } private void operation(Operation op) throws Exception { @@ -940,6 +993,9 @@ private void operation(Operation op) throws Exception { case ENCRYPTION: testEncryption(); break; + case LIST_STATUS_BATCH: + testListStatusBatch(); + break; } }