diff --git a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/HttpFSFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/HttpFSFileSystem.java index c64cfd0879..345b58a462 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/HttpFSFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/HttpFSFileSystem.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.fs.http.client; +import java.util.ArrayList; +import java.util.List; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.ContentSummary; @@ -86,6 +88,7 @@ public class HttpFSFileSystem extends FileSystem public static final String PERMISSION_PARAM = "permission"; public static final String DESTINATION_PARAM = "destination"; public static final String RECURSIVE_PARAM = "recursive"; + public static final String SOURCES_PARAM = "sources"; public static final String OWNER_PARAM = "owner"; public static final String GROUP_PARAM = "group"; public static final String MODIFICATION_TIME_PARAM = "modificationtime"; @@ -167,7 +170,7 @@ public static enum Operation { GETHOMEDIRECTORY(HTTP_GET), GETCONTENTSUMMARY(HTTP_GET), GETFILECHECKSUM(HTTP_GET), GETFILEBLOCKLOCATIONS(HTTP_GET), INSTRUMENTATION(HTTP_GET), - APPEND(HTTP_POST), + APPEND(HTTP_POST), CONCAT(HTTP_POST), CREATE(HTTP_PUT), MKDIRS(HTTP_PUT), RENAME(HTTP_PUT), SETOWNER(HTTP_PUT), SETPERMISSION(HTTP_PUT), SETREPLICATION(HTTP_PUT), SETTIMES(HTTP_PUT), DELETE(HTTP_DELETE); @@ -528,6 +531,29 @@ public FSDataOutputStream append(Path f, int bufferSize, HttpURLConnection.HTTP_OK); } + /** + * Concat existing files together. + * @param f the path to the target destination. + * @param psrcs the paths to the sources to use for the concatenation. + * + * @throws IOException + */ + @Override + public void concat(Path f, Path[] psrcs) throws IOException { + List strPaths = new ArrayList(psrcs.length); + for(Path psrc : psrcs) { + strPaths.add(psrc.toUri().getPath()); + } + String srcs = StringUtils.join(",", strPaths); + + Map params = new HashMap(); + params.put(OP_PARAM, Operation.CONCAT.toString()); + params.put(SOURCES_PARAM, srcs); + HttpURLConnection conn = getConnection(Operation.CONCAT.getMethod(), + params, f, true); + HttpFSUtils.validateResponse(conn, HttpURLConnection.HTTP_OK); + } + /** * Renames Path src to Path dst. Can take place on local fs * or remote DFS. diff --git a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/FSOperations.java b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/FSOperations.java index f81e90e064..8e41d04e44 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/FSOperations.java +++ b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/FSOperations.java @@ -198,6 +198,47 @@ public Void execute(FileSystem fs) throws IOException { } + /** + * Executor that performs an append FileSystemAccess files system operation. + */ + @InterfaceAudience.Private + public static class FSConcat implements FileSystemAccess.FileSystemExecutor { + private Path path; + private Path[] sources; + + /** + * Creates a Concat executor. + * + * @param path target path to concat to. + * @param sources comma seperated absolute paths to use as sources. + */ + public FSConcat(String path, String[] sources) { + this.sources = new Path[sources.length]; + + for(int i = 0; i < sources.length; i++) { + this.sources[i] = new Path(sources[i]); + } + + this.path = new Path(path); + } + + /** + * Executes the filesystem operation. + * + * @param fs filesystem instance to use. + * + * @return void. + * + * @throws IOException thrown if an IO error occured. + */ + @Override + public Void execute(FileSystem fs) throws IOException { + fs.concat(path, sources); + return null; + } + + } + /** * Executor that performs a content-summary FileSystemAccess files system operation. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/HttpFSParametersProvider.java b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/HttpFSParametersProvider.java index b2a28053da..d217322b6a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/HttpFSParametersProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/HttpFSParametersProvider.java @@ -58,6 +58,7 @@ public class HttpFSParametersProvider extends ParametersProvider { PARAMS_DEF.put(Operation.INSTRUMENTATION, new Class[]{DoAsParam.class}); PARAMS_DEF.put(Operation.APPEND, new Class[]{DoAsParam.class, DataParam.class}); + PARAMS_DEF.put(Operation.CONCAT, new Class[]{SourcesParam.class}); PARAMS_DEF.put(Operation.CREATE, new Class[]{DoAsParam.class, PermissionParam.class, OverwriteParam.class, ReplicationParam.class, BlockSizeParam.class, DataParam.class}); @@ -388,6 +389,25 @@ public ReplicationParam() { } } + /** + * Class for concat sources parameter. + */ + @InterfaceAudience.Private + public static class SourcesParam extends StringParam { + + /** + * Parameter name. + */ + public static final String NAME = HttpFSFileSystem.SOURCES_PARAM; + + /** + * Constructor. + */ + public SourcesParam() { + super(NAME, null); + } + } + /** * Class for to-path parameter. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/HttpFSServer.java b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/HttpFSServer.java index 0c3418f5c4..ca7edcc7ac 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/HttpFSServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/HttpFSServer.java @@ -22,22 +22,23 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.http.client.HttpFSFileSystem; -import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.OperationParam; import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.AccessTimeParam; import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.BlockSizeParam; import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.DataParam; -import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.RecursiveParam; +import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.DestinationParam; import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.DoAsParam; import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.FilterParam; import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.GroupParam; import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.LenParam; import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.ModifiedTimeParam; import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.OffsetParam; +import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.OperationParam; import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.OverwriteParam; import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.OwnerParam; import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.PermissionParam; +import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.RecursiveParam; import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.ReplicationParam; -import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.DestinationParam; +import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.SourcesParam; import org.apache.hadoop.lib.service.FileSystemAccess; import org.apache.hadoop.lib.service.FileSystemAccessException; import org.apache.hadoop.lib.service.Groups; @@ -403,9 +404,9 @@ public Response post(InputStream is, Response response; path = makeAbsolute(path); MDC.put(HttpFSFileSystem.OP_PARAM, op.value().name()); - String doAs = params.get(DoAsParam.NAME, DoAsParam.class); switch (op.value()) { case APPEND: { + String doAs = params.get(DoAsParam.NAME, DoAsParam.class); Boolean hasData = params.get(DataParam.NAME, DataParam.class); if (!hasData) { response = Response.temporaryRedirect( @@ -420,6 +421,18 @@ public Response post(InputStream is, } break; } + case CONCAT: { + System.out.println("HTTPFS SERVER CONCAT"); + String sources = params.get(SourcesParam.NAME, SourcesParam.class); + + FSOperations.FSConcat command = + new FSOperations.FSConcat(path, sources.split(",")); + fsExecute(user, null, command); + AUDIT_LOG.info("[{}]", path); + System.out.println("SENT RESPONSE"); + response = Response.ok().build(); + break; + } default: { throw new IOException( MessageFormat.format("Invalid HTTP POST operation [{0}]", diff --git a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/fs/http/client/BaseTestHttpFSWith.java b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/fs/http/client/BaseTestHttpFSWith.java index d44bcc4494..3d96fd8326 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/fs/http/client/BaseTestHttpFSWith.java +++ b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/fs/http/client/BaseTestHttpFSWith.java @@ -28,6 +28,8 @@ import org.apache.hadoop.fs.http.server.HttpFSServerWebApp; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.test.HFSTestCase; import org.apache.hadoop.test.HadoopUsersConfTestHelper; @@ -206,6 +208,30 @@ private void testAppend() throws Exception { } } + private void testConcat() throws Exception { + Configuration config = getProxiedFSConf(); + config.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024); + if (!isLocalFS()) { + FileSystem fs = FileSystem.get(config); + fs.mkdirs(getProxiedFSTestDir()); + Path path1 = new Path("/test/foo.txt"); + Path path2 = new Path("/test/bar.txt"); + Path path3 = new Path("/test/derp.txt"); + DFSTestUtil.createFile(fs, path1, 1024, (short) 3, 0); + DFSTestUtil.createFile(fs, path2, 1024, (short) 3, 0); + DFSTestUtil.createFile(fs, path3, 1024, (short) 3, 0); + fs.close(); + fs = getHttpFSFileSystem(); + fs.concat(path1, new Path[]{path2, path3}); + fs.close(); + fs = FileSystem.get(config); + Assert.assertTrue(fs.exists(path1)); + Assert.assertFalse(fs.exists(path2)); + Assert.assertFalse(fs.exists(path3)); + fs.close(); + } + } + private void testRename() throws Exception { FileSystem fs = FileSystem.get(getProxiedFSConf()); Path path = new Path(getProxiedFSTestDir(), "foo"); @@ -450,7 +476,7 @@ private void testContentSummary() throws Exception { } protected enum Operation { - GET, OPEN, CREATE, APPEND, RENAME, DELETE, LIST_STATUS, WORKING_DIRECTORY, MKDIRS, + GET, OPEN, CREATE, APPEND, CONCAT, RENAME, DELETE, LIST_STATUS, WORKING_DIRECTORY, MKDIRS, SET_TIMES, SET_PERMISSION, SET_OWNER, SET_REPLICATION, CHECKSUM, CONTENT_SUMMARY } @@ -468,6 +494,8 @@ private void operation(Operation op) throws Exception { case APPEND: testAppend(); break; + case CONCAT: + testConcat(); case RENAME: testRename(); break; diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 7008d74ce2..726a52333c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -505,6 +505,8 @@ Release 2.0.3-alpha - Unreleased HDFS-3598. WebHDFS support for file concat. (Plamen Jeliazkov via shv) + HDFS-4456. Add concat to HttpFS and WebHDFS REST API docs. (plamenj2003 via tucu) + OPTIMIZATIONS HDFS-3429. DataNode reads checksums even if client does not need them (todd) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/ConcatSourcesParam.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/ConcatSourcesParam.java index c29f2329c4..e6afbe3e4e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/ConcatSourcesParam.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/ConcatSourcesParam.java @@ -21,7 +21,7 @@ /** The concat source paths parameter. */ public class ConcatSourcesParam extends StringParam { /** Parameter name. */ - public static final String NAME = "srcs"; + public static final String NAME = "sources"; public static final String DEFAULT = NULL; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/apt/WebHDFS.apt.vm b/hadoop-hdfs-project/hadoop-hdfs/src/site/apt/WebHDFS.apt.vm index 38b8dc8ab0..90f8dabce7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/site/apt/WebHDFS.apt.vm +++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/apt/WebHDFS.apt.vm @@ -109,6 +109,9 @@ WebHDFS REST API * {{{Append to a File}<<>>}} (see {{{../../api/org/apache/hadoop/fs/FileSystem.html}FileSystem}}.append) + * {{{Concat File(s)}<<>>}} + (see {{{../../api/org/apache/hadoop/fs/FileSystem.html}FileSystem}}.concat) + * HTTP DELETE * {{{Delete a File/Directory}<<>>}} @@ -299,6 +302,32 @@ Content-Length: 0 {{{../../api/org/apache/hadoop/fs/FileSystem.html}FileSystem}}.append +** {Concat File(s)} + + * Submit a HTTP POST request. + ++--------------------------------- +curl -i -X POST "http://:/webhdfs/v1/?op=CONCAT&sources=" ++--------------------------------- + + The client receives a response with zero content length: + ++--------------------------------- +HTTP/1.1 200 OK +Content-Length: 0 ++--------------------------------- + + [] + + This REST API call is available as of Hadoop version 2.0.3. + Please note that is a comma seperated list of absolute paths. + (Example: sources=/test/file1,/test/file2,/test/file3) + + See also: + {{{Sources}<<>>}}, + {{{../../api/org/apache/hadoop/fs/FileSystem.html}FileSystem}}.concat + + ** {Open and Read a File} * Submit a HTTP GET request with automatically following redirects. @@ -1727,6 +1756,29 @@ var tokenProperties = {{{Set Replication Factor}<<>>}} +** {Sources} + +*----------------+-------------------------------------------------------------------+ +|| Name | <<>> | +*----------------+-------------------------------------------------------------------+ +|| Description | The comma seperated absolute paths used for concatenation. | +*----------------+-------------------------------------------------------------------+ +|| Type | String | +*----------------+-------------------------------------------------------------------+ +|| Default Value | \ | +*----------------+-------------------------------------------------------------------+ +|| Valid Values | A list of comma seperated absolute FileSystem paths without scheme and authority. | +*----------------+-------------------------------------------------------------------+ +|| Syntax | See the note in {{Delegation}}. | +*----------------+-------------------------------------------------------------------+ + + <> that sources are absolute FileSystem paths. + + + See also: + {{{Concat File(s)}<<>>}} + + ** {Token} *----------------+-------------------------------------------------------------------+