From 6c3b59505b863f03629da52a1e9b886fe9b496d0 Mon Sep 17 00:00:00 2001 From: Tsz-wo Sze Date: Tue, 13 Sep 2011 08:34:27 +0000 Subject: [PATCH] HDFS-2317. Support read access to HDFS in webhdfs. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1170085 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 + .../hadoop/hdfs/ByteRangeInputStream.java | 5 +- .../hadoop/hdfs/DistributedFileSystem.java | 3 +- .../web/resources/DatanodeWebHdfsMethods.java | 61 ++++++++- .../web/resources/NamenodeWebHdfsMethods.java | 127 +++++++++++++++--- .../hadoop/hdfs/web/WebHdfsFileSystem.java | 69 ++++++++-- .../hadoop/hdfs/web/resources/GetOpParam.java | 4 + .../hdfs/web/resources/LengthParam.java | 49 +++++++ .../hdfs/web/resources/OffsetParam.java | 49 +++++++ .../hdfs/web/resources/UriFsPathParam.java | 2 +- .../apache/hadoop/hdfs/MiniDFSCluster.java | 13 -- .../hdfs/web/TestFSMainOperationsWebHdfs.java | 73 ++++------ .../web/TestWebHdfsFileSystemContract.java | 48 ++----- 13 files changed, 367 insertions(+), 138 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/LengthParam.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/OffsetParam.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index fd8d408c87..f43475a331 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -8,6 +8,8 @@ Trunk (unreleased changes) HDFS-2284. Add a new FileSystem, webhdfs://, for supporting write Http access to HDFS. (szetszwo) + HDFS-2317. Support read access to HDFS in webhdfs. (szetszwo) + IMPROVEMENTS HADOOP-7524 Change RPC to allow multiple protocols including multuple versions of the same protocol (sanjay Radia) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java index fcdf6cb564..945f4ac033 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java @@ -33,7 +33,7 @@ * is made on the successive read(). The normal input stream functions are * connected to the currently active input stream. */ -class ByteRangeInputStream extends FSInputStream { +public class ByteRangeInputStream extends FSInputStream { /** * This class wraps a URL to allow easy mocking when testing. The URL class @@ -71,7 +71,8 @@ enum StreamStatus { StreamStatus status = StreamStatus.SEEK; - ByteRangeInputStream(final URL url) { + /** Create an input stream with the URL. */ + public ByteRangeInputStream(final URL url) { this(new URLOpener(url), new URLOpener(null)); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java index 68f8616941..68ecf0f4af 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java @@ -229,12 +229,11 @@ public boolean recoverLease(Path f) throws IOException { return dfs.recoverLease(getPathName(f)); } - @SuppressWarnings("deprecation") @Override public FSDataInputStream open(Path f, int bufferSize) throws IOException { statistics.incrementReadOps(1); return new DFSClient.DFSDataInputStream( - dfs.open(getPathName(f), bufferSize, verifyChecksum, statistics)); + dfs.open(getPathName(f), bufferSize, verifyChecksum)); } /** This optional operation is not yet supported. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java index 5a9bbb4114..e270a961bf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.net.InetSocketAddress; import java.net.URI; import java.net.URISyntaxException; @@ -27,6 +28,7 @@ import javax.servlet.ServletContext; import javax.ws.rs.Consumes; import javax.ws.rs.DefaultValue; +import javax.ws.rs.GET; import javax.ws.rs.POST; import javax.ws.rs.PUT; import javax.ws.rs.Path; @@ -36,6 +38,7 @@ import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; +import javax.ws.rs.core.StreamingOutput; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -43,12 +46,16 @@ import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream; import org.apache.hadoop.hdfs.DFSOutputStream; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.web.WebHdfsFileSystem; import org.apache.hadoop.hdfs.web.resources.BlockSizeParam; import org.apache.hadoop.hdfs.web.resources.BufferSizeParam; +import org.apache.hadoop.hdfs.web.resources.GetOpParam; +import org.apache.hadoop.hdfs.web.resources.LengthParam; +import org.apache.hadoop.hdfs.web.resources.OffsetParam; import org.apache.hadoop.hdfs.web.resources.OverwriteParam; import org.apache.hadoop.hdfs.web.resources.Param; import org.apache.hadoop.hdfs.web.resources.PermissionParam; @@ -61,7 +68,7 @@ /** Web-hdfs DataNode implementation. */ @Path("") public class DatanodeWebHdfsMethods { - private static final Log LOG = LogFactory.getLog(DatanodeWebHdfsMethods.class); + public static final Log LOG = LogFactory.getLog(DatanodeWebHdfsMethods.class); private @Context ServletContext context; @@ -166,4 +173,56 @@ public Response post( throw new UnsupportedOperationException(op + " is not supported"); } } + + /** Handle HTTP GET request. */ + @GET + @Path("{" + UriFsPathParam.NAME + ":.*}") + @Produces({MediaType.APPLICATION_OCTET_STREAM, MediaType.APPLICATION_JSON}) + public Response get( + @PathParam(UriFsPathParam.NAME) final UriFsPathParam path, + @QueryParam(GetOpParam.NAME) @DefaultValue(GetOpParam.DEFAULT) + final GetOpParam op, + @QueryParam(OffsetParam.NAME) @DefaultValue(OffsetParam.DEFAULT) + final OffsetParam offset, + @QueryParam(LengthParam.NAME) @DefaultValue(LengthParam.DEFAULT) + final LengthParam length, + @QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT) + final BufferSizeParam bufferSize + ) throws IOException, URISyntaxException { + + if (LOG.isTraceEnabled()) { + LOG.trace(op + ": " + path + + Param.toSortedString(", ", offset, length, bufferSize)); + } + + final String fullpath = path.getAbsolutePath(); + final DataNode datanode = (DataNode)context.getAttribute("datanode"); + + switch(op.getValue()) { + case OPEN: + { + final Configuration conf = new Configuration(datanode.getConf()); + final InetSocketAddress nnRpcAddr = NameNode.getAddress(conf); + final DFSClient dfsclient = new DFSClient(nnRpcAddr, conf); + final DFSDataInputStream in = new DFSClient.DFSDataInputStream( + dfsclient.open(fullpath, bufferSize.getValue(), true)); + in.seek(offset.getValue()); + + final StreamingOutput streaming = new StreamingOutput() { + @Override + public void write(final OutputStream out) throws IOException { + final Long n = length.getValue(); + if (n == null) { + IOUtils.copyBytes(in, out, bufferSize.getValue()); + } else { + IOUtils.copyBytes(in, out, n, false); + } + } + }; + return Response.ok(streaming).type(MediaType.APPLICATION_OCTET_STREAM).build(); + } + default: + throw new UnsupportedOperationException(op + " is not supported"); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java index c3b2e4ad74..362060a22d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java @@ -17,8 +17,11 @@ */ package org.apache.hadoop.hdfs.server.namenode.web.resources; +import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; +import java.io.PrintStream; import java.net.URI; import java.net.URISyntaxException; import java.util.EnumSet; @@ -37,11 +40,13 @@ import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; +import javax.ws.rs.core.StreamingOutput; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Options; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.DirectoryListing; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; @@ -58,7 +63,9 @@ import org.apache.hadoop.hdfs.web.resources.GetOpParam; import org.apache.hadoop.hdfs.web.resources.GroupParam; import org.apache.hadoop.hdfs.web.resources.HttpOpParam; +import org.apache.hadoop.hdfs.web.resources.LengthParam; import org.apache.hadoop.hdfs.web.resources.ModificationTimeParam; +import org.apache.hadoop.hdfs.web.resources.OffsetParam; import org.apache.hadoop.hdfs.web.resources.OverwriteParam; import org.apache.hadoop.hdfs.web.resources.OwnerParam; import org.apache.hadoop.hdfs.web.resources.Param; @@ -79,15 +86,23 @@ public class NamenodeWebHdfsMethods { private @Context ServletContext context; private static DatanodeInfo chooseDatanode(final NameNode namenode, - final String path, final HttpOpParam.Op op) throws IOException { - if (op == PostOpParam.Op.APPEND) { - final HdfsFileStatus status = namenode.getRpcServer().getFileInfo(path); + final String path, final HttpOpParam.Op op, final long openOffset + ) throws IOException { + if (op == GetOpParam.Op.OPEN || op == PostOpParam.Op.APPEND) { + final NamenodeProtocols np = namenode.getRpcServer(); + final HdfsFileStatus status = np.getFileInfo(path); final long len = status.getLen(); + if (op == GetOpParam.Op.OPEN && (openOffset < 0L || openOffset >= len)) { + throw new IOException("Offset=" + openOffset + " out of the range [0, " + + len + "); " + op + ", path=" + path); + } + if (len > 0) { - final LocatedBlocks locations = namenode.getRpcServer().getBlockLocations(path, len-1, 1); + final long offset = op == GetOpParam.Op.OPEN? openOffset: len - 1; + final LocatedBlocks locations = np.getBlockLocations(path, offset, 1); final int count = locations.locatedBlockCount(); if (count > 0) { - return JspHelper.bestNode(locations.get(count - 1)); + return JspHelper.bestNode(locations.get(0)); } } } @@ -98,9 +113,9 @@ private static DatanodeInfo chooseDatanode(final NameNode namenode, } private static URI redirectURI(final NameNode namenode, - final String path, final HttpOpParam.Op op, + final String path, final HttpOpParam.Op op, final long openOffset, final Param... parameters) throws URISyntaxException, IOException { - final DatanodeInfo dn = chooseDatanode(namenode, path, op); + final DatanodeInfo dn = chooseDatanode(namenode, path, op, openOffset); final String query = op.toQueryString() + Param.toSortedString("&", parameters); final String uripath = "/" + WebHdfsFileSystem.PATH_PREFIX + path; @@ -148,8 +163,9 @@ public Response put( if (LOG.isTraceEnabled()) { LOG.trace(op + ": " + path - + Param.toSortedString(", ", dstPath, owner, group, permission, - overwrite, bufferSize, replication, blockSize)); + + Param.toSortedString(", ", dstPath, owner, group, permission, + overwrite, bufferSize, replication, blockSize, + modificationTime, accessTime, renameOptions)); } final String fullpath = path.getAbsolutePath(); @@ -159,7 +175,7 @@ public Response put( switch(op.getValue()) { case CREATE: { - final URI uri = redirectURI(namenode, fullpath, op.getValue(), + final URI uri = redirectURI(namenode, fullpath, op.getValue(), -1L, permission, overwrite, bufferSize, replication, blockSize); return Response.temporaryRedirect(uri).build(); } @@ -234,7 +250,8 @@ public Response post( switch(op.getValue()) { case APPEND: { - final URI uri = redirectURI(namenode, fullpath, op.getValue(), bufferSize); + final URI uri = redirectURI(namenode, fullpath, op.getValue(), -1L, + bufferSize); return Response.temporaryRedirect(uri).build(); } default: @@ -250,9 +267,15 @@ public Response post( @Produces({MediaType.APPLICATION_OCTET_STREAM, MediaType.APPLICATION_JSON}) public Response root( @QueryParam(GetOpParam.NAME) @DefaultValue(GetOpParam.DEFAULT) - final GetOpParam op - ) throws IOException { - return get(ROOT, op); + final GetOpParam op, + @QueryParam(OffsetParam.NAME) @DefaultValue(OffsetParam.DEFAULT) + final OffsetParam offset, + @QueryParam(LengthParam.NAME) @DefaultValue(LengthParam.DEFAULT) + final LengthParam length, + @QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT) + final BufferSizeParam bufferSize + ) throws IOException, URISyntaxException { + return get(ROOT, op, offset, length, bufferSize); } /** Handle HTTP GET request. */ @@ -262,27 +285,89 @@ public Response root( public Response get( @PathParam(UriFsPathParam.NAME) final UriFsPathParam path, @QueryParam(GetOpParam.NAME) @DefaultValue(GetOpParam.DEFAULT) - final GetOpParam op - ) throws IOException { + final GetOpParam op, + @QueryParam(OffsetParam.NAME) @DefaultValue(OffsetParam.DEFAULT) + final OffsetParam offset, + @QueryParam(LengthParam.NAME) @DefaultValue(LengthParam.DEFAULT) + final LengthParam length, + @QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT) + final BufferSizeParam bufferSize + ) throws IOException, URISyntaxException { if (LOG.isTraceEnabled()) { LOG.trace(op + ", " + path - + Param.toSortedString(", ")); + + Param.toSortedString(", ", offset, length, bufferSize)); } + final NameNode namenode = (NameNode)context.getAttribute("name.node"); + final String fullpath = path.getAbsolutePath(); + final NamenodeProtocols np = namenode.getRpcServer(); + switch(op.getValue()) { + case OPEN: + { + final URI uri = redirectURI(namenode, fullpath, op.getValue(), + offset.getValue(), offset, length, bufferSize); + return Response.temporaryRedirect(uri).build(); + } case GETFILESTATUS: - final NameNode namenode = (NameNode)context.getAttribute("name.node"); - final String fullpath = path.getAbsolutePath(); - final HdfsFileStatus status = namenode.getRpcServer().getFileInfo(fullpath); + { + final HdfsFileStatus status = np.getFileInfo(fullpath); final String js = JsonUtil.toJsonString(status); return Response.ok(js).type(MediaType.APPLICATION_JSON).build(); - + } + case LISTSTATUS: + { + final StreamingOutput streaming = getListingStream(np, fullpath); + return Response.ok(streaming).type(MediaType.APPLICATION_JSON).build(); + } default: throw new UnsupportedOperationException(op + " is not supported"); } } + private static DirectoryListing getDirectoryListing(final NamenodeProtocols np, + final String p, byte[] startAfter) throws IOException { + final DirectoryListing listing = np.getListing(p, startAfter, false); + if (listing == null) { // the directory does not exist + throw new FileNotFoundException("File " + p + " does not exist."); + } + return listing; + } + + private static StreamingOutput getListingStream(final NamenodeProtocols np, + final String p) throws IOException { + final DirectoryListing first = getDirectoryListing(np, p, + HdfsFileStatus.EMPTY_NAME); + + return new StreamingOutput() { + @Override + public void write(final OutputStream outstream) throws IOException { + final PrintStream out = new PrintStream(outstream); + out.print('['); + + final HdfsFileStatus[] partial = first.getPartialListing(); + if (partial.length > 0) { + out.print(JsonUtil.toJsonString(partial[0])); + } + for(int i = 1; i < partial.length; i++) { + out.println(','); + out.print(JsonUtil.toJsonString(partial[i])); + } + + for(DirectoryListing curr = first; curr.hasMore(); ) { + curr = getDirectoryListing(np, p, curr.getLastName()); + for(HdfsFileStatus s : curr.getPartialListing()) { + out.println(','); + out.print(JsonUtil.toJsonString(s)); + } + } + + out.println(']'); + } + }; + } + /** Handle HTTP DELETE request. */ @DELETE @Path("{path:.*}") diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java index be428acfe9..72d86a011c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java @@ -30,6 +30,7 @@ import java.util.Map; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileStatus; @@ -37,6 +38,7 @@ import org.apache.hadoop.fs.ParentNotDirectoryException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.ByteRangeInputStream; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HftpFileSystem; import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException; @@ -122,12 +124,11 @@ private Path makeAbsolute(Path f) { } @SuppressWarnings("unchecked") - private static Map jsonParse(final InputStream in - ) throws IOException { + private static T jsonParse(final InputStream in) throws IOException { if (in == null) { throw new IOException("The input stream is null."); } - return (Map)JSON.parse(new InputStreamReader(in)); + return (T)JSON.parse(new InputStreamReader(in)); } private static void validateResponse(final HttpOpParam.Op op, @@ -138,7 +139,7 @@ private static void validateResponse(final HttpOpParam.Op op, try { m = jsonParse(conn.getErrorStream()); } catch(IOException e) { - throw new IOException("Unexpected HTTP response: code = " + code + " != " + throw new IOException("Unexpected HTTP response: code=" + code + " != " + op.getExpectedHttpResponseCode() + ", " + op.toQueryString() + ", message=" + conn.getResponseMessage(), e); } @@ -155,22 +156,26 @@ private static void validateResponse(final HttpOpParam.Op op, } } - @Override - protected HttpURLConnection openConnection(String path, String query) - throws IOException { - query = addDelegationTokenParam(query); + private URL toUrl(final HttpOpParam.Op op, final Path fspath, + final Param... parameters) throws IOException { + //initialize URI path and query + final String path = "/" + PATH_PREFIX + + makeQualified(fspath).toUri().getPath(); + final String query = op.toQueryString() + + Param.toSortedString("&", parameters); final URL url = getNamenodeURL(path, query); - return (HttpURLConnection)url.openConnection(); + if (LOG.isTraceEnabled()) { + LOG.trace("url=" + url); + } + return url; } private HttpURLConnection httpConnect(final HttpOpParam.Op op, final Path fspath, final Param... parameters) throws IOException { - //initialize URI path and query - final String uripath = "/" + PATH_PREFIX + makeQualified(fspath).toUri().getPath(); - final String query = op.toQueryString() + Param.toSortedString("&", parameters); + final URL url = toUrl(op, fspath, parameters); //connect and get response - final HttpURLConnection conn = openConnection(uripath, query); + final HttpURLConnection conn = (HttpURLConnection)url.openConnection(); try { conn.setRequestMethod(op.getType().toString()); conn.setDoOutput(op.getDoOutput()); @@ -186,7 +191,17 @@ private HttpURLConnection httpConnect(final HttpOpParam.Op op, final Path fspath } } - private Map run(final HttpOpParam.Op op, final Path fspath, + /** + * Run a http operation. + * Connect to the http server, validate response, and obtain the JSON output. + * + * @param op http operation + * @param fspath file system path + * @param parameters parameters for the operation + * @return a JSON object, e.g. Object[], Map, etc. + * @throws IOException + */ + private T run(final HttpOpParam.Op op, final Path fspath, final Param... parameters) throws IOException { final HttpURLConnection conn = httpConnect(op, fspath, parameters); validateResponse(op, conn); @@ -342,4 +357,30 @@ public boolean delete(Path f, boolean recursive) throws IOException { final Map json = run(op, f, new RecursiveParam(recursive)); return (Boolean)json.get(op.toString()); } + + @Override + public FSDataInputStream open(final Path f, final int buffersize + ) throws IOException { + statistics.incrementReadOps(1); + final HttpOpParam.Op op = GetOpParam.Op.OPEN; + final URL url = toUrl(op, f, new BufferSizeParam(buffersize)); + return new FSDataInputStream(new ByteRangeInputStream(url)); + } + + @Override + public FileStatus[] listStatus(final Path f) throws IOException { + statistics.incrementReadOps(1); + + final HttpOpParam.Op op = GetOpParam.Op.LISTSTATUS; + final Object[] array = run(op, f); + + //convert FileStatus + final FileStatus[] statuses = new FileStatus[array.length]; + for(int i = 0; i < array.length; i++) { + @SuppressWarnings("unchecked") + final Map m = (Map)array[i]; + statuses[i] = makeQualified(JsonUtil.toFileStatus(m), f); + } + return statuses; + } } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/GetOpParam.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/GetOpParam.java index 0d19fb46e9..6f11871ebb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/GetOpParam.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/GetOpParam.java @@ -26,7 +26,11 @@ public class GetOpParam extends HttpOpParam { /** Get operations. */ public static enum Op implements HttpOpParam.Op { + OPEN(HttpURLConnection.HTTP_OK), + GETFILESTATUS(HttpURLConnection.HTTP_OK), + LISTSTATUS(HttpURLConnection.HTTP_OK), + NULL(HttpURLConnection.HTTP_NOT_IMPLEMENTED); final int expectedHttpResponseCode; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/LengthParam.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/LengthParam.java new file mode 100644 index 0000000000..90d4f6289d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/LengthParam.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.web.resources; + +/** Length parameter. */ +public class LengthParam extends LongParam { + /** Parameter name. */ + public static final String NAME = "length"; + /** Default parameter value. */ + public static final String DEFAULT = NULL; + + private static final Domain DOMAIN = new Domain(NAME); + + /** + * Constructor. + * @param value the parameter value. + */ + public LengthParam(final Long value) { + super(DOMAIN, value); + } + + /** + * Constructor. + * @param str a string representation of the parameter value. + */ + public LengthParam(final String str) { + this(DOMAIN.parse(str)); + } + + @Override + public String getName() { + return NAME; + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/OffsetParam.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/OffsetParam.java new file mode 100644 index 0000000000..8b3654dbd8 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/OffsetParam.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.web.resources; + +/** Offset parameter. */ +public class OffsetParam extends LongParam { + /** Parameter name. */ + public static final String NAME = "offset"; + /** Default parameter value. */ + public static final String DEFAULT = "0"; + + private static final Domain DOMAIN = new Domain(NAME); + + /** + * Constructor. + * @param value the parameter value. + */ + public OffsetParam(final Long value) { + super(DOMAIN, value); + } + + /** + * Constructor. + * @param str a string representation of the parameter value. + */ + public OffsetParam(final String str) { + this(DOMAIN.parse(str)); + } + + @Override + public String getName() { + return NAME; + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/UriFsPathParam.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/UriFsPathParam.java index 762b6e2031..2e12697076 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/UriFsPathParam.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/UriFsPathParam.java @@ -39,7 +39,7 @@ public String getName() { /** @return the absolute path. */ public final String getAbsolutePath() { - final String path = getValue(); + final String path = getValue(); //The first / has been stripped out. return path == null? null: "/" + path; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java index f1cf817e20..8ebae4e57a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java @@ -63,7 +63,6 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.tools.DFSAdmin; -import org.apache.hadoop.hdfs.web.WebHdfsFileSystem; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.net.DNSToSwitchMapping; import org.apache.hadoop.net.NetUtils; @@ -1461,18 +1460,6 @@ public HftpFileSystem getHftpFileSystem(int nnIndex) throws IOException { } } - /** - * @return a {@link WebHdfsFileSystem} object. - */ - public WebHdfsFileSystem getWebHdfsFileSystem() throws IOException { - final String str = WebHdfsFileSystem.SCHEME + "://" + conf.get("dfs.http.address"); - try { - return (WebHdfsFileSystem)FileSystem.get(new URI(str), conf); - } catch (URISyntaxException e) { - throw new IOException(e); - } - } - /** * @return a {@link HftpFileSystem} object as specified user. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestFSMainOperationsWebHdfs.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestFSMainOperationsWebHdfs.java index dd6e590cd7..1addbf00de 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestFSMainOperationsWebHdfs.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestFSMainOperationsWebHdfs.java @@ -17,80 +17,63 @@ */ package org.apache.hadoop.hdfs.web; - -import static org.apache.hadoop.fs.FileSystemTestHelper.exists; -import static org.apache.hadoop.fs.FileSystemTestHelper.getDefaultBlockSize; -import static org.apache.hadoop.fs.FileSystemTestHelper.getTestRootPath; - -import java.io.IOException; +import java.net.URI; import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSMainOperationsBaseTest; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.server.datanode.web.resources.DatanodeWebHdfsMethods; import org.apache.hadoop.hdfs.web.resources.ExceptionHandler; import org.apache.log4j.Level; -import org.junit.Assert; +import org.junit.AfterClass; +import org.junit.BeforeClass; import org.junit.Test; public class TestFSMainOperationsWebHdfs extends FSMainOperationsBaseTest { { ((Log4JLogger)ExceptionHandler.LOG).getLogger().setLevel(Level.ALL); + ((Log4JLogger)DatanodeWebHdfsMethods.LOG).getLogger().setLevel(Level.ALL); } - private static final MiniDFSCluster cluster; - private static final Path defaultWorkingDirectory; + private static MiniDFSCluster cluster = null; + private static Path defaultWorkingDirectory; - static { + @BeforeClass + public static void setupCluster() { Configuration conf = new Configuration(); try { cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build(); - fSys = cluster.getWebHdfsFileSystem(); + cluster.waitActive(); + + final String uri = WebHdfsFileSystem.SCHEME + "://" + + conf.get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY); + fSys = FileSystem.get(new URI(uri), conf); defaultWorkingDirectory = fSys.getWorkingDirectory(); - } catch (IOException e) { + } catch (Exception e) { throw new RuntimeException(e); } } + @AfterClass + public static void shutdownCluster() { + if (cluster != null) { + cluster.shutdown(); + cluster = null; + } + } + @Override protected Path getDefaultWorkingDirectory() { return defaultWorkingDirectory; } - - /** Override the following method without using position read. */ - @Override - protected void writeReadAndDelete(int len) throws IOException { - Path path = getTestRootPath(fSys, "test/hadoop/file"); - fSys.mkdirs(path.getParent()); - - FSDataOutputStream out = - fSys.create(path, false, 4096, (short) 1, getDefaultBlockSize() ); - out.write(data, 0, len); - out.close(); - - Assert.assertTrue("Exists", exists(fSys, path)); - Assert.assertEquals("Length", len, fSys.getFileStatus(path).getLen()); - - FSDataInputStream in = fSys.open(path); - for (int i = 0; i < len; i++) { - final int b = in.read(); - Assert.assertEquals("Position " + i, data[i], b); - } - in.close(); - Assert.assertTrue("Deleted", fSys.delete(path, false)); - Assert.assertFalse("No longer exists", exists(fSys, path)); - } - - //The following tests failed for HftpFileSystem, - //Disable it for WebHdfsFileSystem - @Test - public void testListStatusThrowsExceptionForNonExistentFile() {} + //The following test failed since WebHdfsFileSystem did not support + //authentication. + //Disable it. @Test public void testListStatusThrowsExceptionForUnreadableDir() {} - @Test - public void testGlobStatusThrowsExceptionForNonExistentFile() {} } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java index 9080783631..39334e2693 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java @@ -19,23 +19,23 @@ package org.apache.hadoop.hdfs.web; import java.io.IOException; +import java.net.URI; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystemContractBaseTest; -import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.security.UserGroupInformation; public class TestWebHdfsFileSystemContract extends FileSystemContractBaseTest { + private static final Configuration conf = new Configuration(); private static final MiniDFSCluster cluster; private String defaultWorkingDirectory; static { - Configuration conf = new Configuration(); try { cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build(); + cluster.waitActive(); } catch (IOException e) { throw new RuntimeException(e); } @@ -43,44 +43,14 @@ public class TestWebHdfsFileSystemContract extends FileSystemContractBaseTest { @Override protected void setUp() throws Exception { - fs = cluster.getWebHdfsFileSystem(); - defaultWorkingDirectory = "/user/" - + UserGroupInformation.getCurrentUser().getShortUserName(); + final String uri = WebHdfsFileSystem.SCHEME + "://" + + conf.get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY); + fs = FileSystem.get(new URI(uri), conf); + defaultWorkingDirectory = fs.getWorkingDirectory().toUri().getPath(); } @Override protected String getDefaultWorkingDirectory() { return defaultWorkingDirectory; } - - /** Override the following method without using position read. */ - @Override - protected void writeReadAndDelete(int len) throws IOException { - Path path = path("/test/hadoop/file"); - - fs.mkdirs(path.getParent()); - - FSDataOutputStream out = fs.create(path, false, - fs.getConf().getInt("io.file.buffer.size", 4096), - (short) 1, getBlockSize()); - out.write(data, 0, len); - out.close(); - - assertTrue("Exists", fs.exists(path)); - assertEquals("Length", len, fs.getFileStatus(path).getLen()); - - FSDataInputStream in = fs.open(path); - for (int i = 0; i < len; i++) { - final int b = in.read(); - assertEquals("Position " + i, data[i], b); - } - in.close(); - - assertTrue("Deleted", fs.delete(path, false)); - assertFalse("No longer exists", fs.exists(path)); - } - - //The following test failed for HftpFileSystem, - //Disable it for WebHdfsFileSystem - public void testListStatusThrowsExceptionForNonExistentFile() {} }