From 6afe3e0d22caa2b0752d52ddf7794c25a66cc9c8 Mon Sep 17 00:00:00 2001 From: Tsz-wo Sze Date: Thu, 3 Nov 2011 22:34:47 +0000 Subject: [PATCH] HDFS-2527. WebHdfs: remove the use of "Range" header in Open; use ugi username if renewer parameter is null in GetDelegationToken; response OK when setting replication for non-files; rename GETFILEBLOCKLOCATIONS to GET_BLOCK_LOCATIONS and state that it is a private unstable API; replace isDirectory and isSymlink with enum {FILE, DIRECTORY, SYMLINK} in HdfsFileStatus JSON object. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1197329 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 11 +- .../hadoop/hdfs/ByteRangeInputStream.java | 100 +++---------- .../apache/hadoop/hdfs/HftpFileSystem.java | 55 ++++++- .../web/resources/DatanodeWebHdfsMethods.java | 4 +- .../web/resources/NamenodeWebHdfsMethods.java | 12 +- .../org/apache/hadoop/hdfs/web/JsonUtil.java | 26 ++-- .../hadoop/hdfs/web/WebHdfsFileSystem.java | 90 ++++++++++-- .../hadoop/hdfs/web/resources/GetOpParam.java | 4 +- .../hadoop/hdfs/TestByteRangeInputStream.java | 50 +------ .../hdfs/web/TestOffsetUrlInputStream.java | 137 ++++++++++++++++++ .../web/TestWebHdfsFileSystemContract.java | 2 +- 11 files changed, 325 insertions(+), 166 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestOffsetUrlInputStream.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 9696804964..3a2e9bf9a4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -91,8 +91,6 @@ Trunk (unreleased changes) HDFS-2526. (Client)NamenodeProtocolTranslatorR23 do not need to keep a reference to rpcProxyWithoutRetry (atm) - HDFS-2416. distcp with a webhdfs uri on a secure cluster fails. (jitendra) - Release 0.23.1 - UNRELEASED INCOMPATIBLE CHANGES @@ -113,6 +111,15 @@ Release 0.23.1 - UNRELEASED BUG FIXES + HDFS-2416. distcp with a webhdfs uri on a secure cluster fails. (jitendra) + + HDFS-2527. WebHdfs: remove the use of "Range" header in Open; use ugi + username if renewer parameter is null in GetDelegationToken; response OK + when setting replication for non-files; rename GETFILEBLOCKLOCATIONS to + GET_BLOCK_LOCATIONS and state that it is a private unstable API; replace + isDirectory and isSymlink with enum {FILE, DIRECTORY, SYMLINK} in + HdfsFileStatus JSON object. (szetszwo) + Release 0.23.0 - 2011-11-01 INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java index 80da377f34..36c7c1dec0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java @@ -18,17 +18,13 @@ package org.apache.hadoop.hdfs; -import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.net.HttpURLConnection; -import java.net.MalformedURLException; import java.net.URL; -import java.util.StringTokenizer; import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.hdfs.server.namenode.StreamFile; -import org.apache.hadoop.hdfs.web.resources.OffsetParam; /** * To support HTTP byte streams, a new connection to an HTTP server needs to be @@ -37,16 +33,14 @@ import org.apache.hadoop.hdfs.web.resources.OffsetParam; * is made on the successive read(). The normal input stream functions are * connected to the currently active input stream. */ -public class ByteRangeInputStream extends FSInputStream { +public abstract class ByteRangeInputStream extends FSInputStream { /** * This class wraps a URL and provides method to open connection. * It can be overridden to change how a connection is opened. */ - public static class URLOpener { + public static abstract class URLOpener { protected URL url; - /** The url with offset parameter */ - protected URL offsetUrl; public URLOpener(URL u) { url = u; @@ -60,52 +54,9 @@ public class ByteRangeInputStream extends FSInputStream { return url; } - protected HttpURLConnection openConnection() throws IOException { - return (HttpURLConnection)offsetUrl.openConnection(); - } + protected abstract HttpURLConnection openConnection() throws IOException; - private HttpURLConnection openConnection(final long offset) throws IOException { - offsetUrl = offset == 0L? url: new URL(url + "&" + new OffsetParam(offset)); - final HttpURLConnection conn = openConnection(); - conn.setRequestMethod("GET"); - if (offset != 0L) { - conn.setRequestProperty("Range", "bytes=" + offset + "-"); - } - return conn; - } - } - - static private final String OFFSET_PARAM_PREFIX = OffsetParam.NAME + "="; - - /** Remove offset parameter, if there is any, from the url */ - static URL removeOffsetParam(final URL url) throws MalformedURLException { - String query = url.getQuery(); - if (query == null) { - return url; - } - final String lower = query.toLowerCase(); - if (!lower.startsWith(OFFSET_PARAM_PREFIX) - && !lower.contains("&" + OFFSET_PARAM_PREFIX)) { - return url; - } - - //rebuild query - StringBuilder b = null; - for(final StringTokenizer st = new StringTokenizer(query, "&"); - st.hasMoreTokens();) { - final String token = st.nextToken(); - if (!token.toLowerCase().startsWith(OFFSET_PARAM_PREFIX)) { - if (b == null) { - b = new StringBuilder("?").append(token); - } else { - b.append('&').append(token); - } - } - } - query = b == null? "": b.toString(); - - final String urlStr = url.toString(); - return new URL(urlStr.substring(0, urlStr.indexOf('?')) + query); + protected abstract HttpURLConnection openConnection(final long offset) throws IOException; } enum StreamStatus { @@ -120,11 +71,6 @@ public class ByteRangeInputStream extends FSInputStream { StreamStatus status = StreamStatus.SEEK; - /** Create an input stream with the URL. */ - public ByteRangeInputStream(final URL url) { - this(new URLOpener(url), new URLOpener(null)); - } - /** * Create with the specified URLOpeners. Original url is used to open the * stream for the first time. Resolved url is used in subsequent requests. @@ -136,6 +82,12 @@ public class ByteRangeInputStream extends FSInputStream { this.resolvedURL = r; } + protected abstract void checkResponseCode(final HttpURLConnection connection + ) throws IOException; + + protected abstract URL getResolvedUrl(final HttpURLConnection connection + ) throws IOException; + private InputStream getInputStream() throws IOException { if (status != StreamStatus.NORMAL) { @@ -150,32 +102,14 @@ public class ByteRangeInputStream extends FSInputStream { (resolvedURL.getURL() == null) ? originalURL : resolvedURL; final HttpURLConnection connection = opener.openConnection(startPos); - try { - connection.connect(); - final String cl = connection.getHeaderField(StreamFile.CONTENT_LENGTH); - filelength = (cl == null) ? -1 : Long.parseLong(cl); - if (HftpFileSystem.LOG.isDebugEnabled()) { - HftpFileSystem.LOG.debug("filelength = " + filelength); - } - in = connection.getInputStream(); - } catch (FileNotFoundException fnfe) { - throw fnfe; - } catch (IOException ioe) { - HftpFileSystem.throwIOExceptionFromConnection(connection, ioe); - } - - int respCode = connection.getResponseCode(); - if (startPos != 0 && respCode != HttpURLConnection.HTTP_PARTIAL) { - // We asked for a byte range but did not receive a partial content - // response... - throw new IOException("HTTP_PARTIAL expected, received " + respCode); - } else if (startPos == 0 && respCode != HttpURLConnection.HTTP_OK) { - // We asked for all bytes from the beginning but didn't receive a 200 - // response (none of the other 2xx codes are valid here) - throw new IOException("HTTP_OK expected, received " + respCode); - } + connection.connect(); + checkResponseCode(connection); - resolvedURL.setURL(removeOffsetParam(connection.getURL())); + final String cl = connection.getHeaderField(StreamFile.CONTENT_LENGTH); + filelength = (cl == null) ? -1 : Long.parseLong(cl); + in = connection.getInputStream(); + + resolvedURL.setURL(getResolvedUrl(connection)); status = StreamStatus.NORMAL; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java index 1ef00793de..7e86b9e851 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java @@ -372,13 +372,66 @@ public class HftpFileSystem extends FileSystem return query; } + static class RangeHeaderUrlOpener extends ByteRangeInputStream.URLOpener { + RangeHeaderUrlOpener(final URL url) { + super(url); + } + + @Override + protected HttpURLConnection openConnection() throws IOException { + return (HttpURLConnection)url.openConnection(); + } + + /** Use HTTP Range header for specifying offset. */ + @Override + protected HttpURLConnection openConnection(final long offset) throws IOException { + final HttpURLConnection conn = openConnection(); + conn.setRequestMethod("GET"); + if (offset != 0L) { + conn.setRequestProperty("Range", "bytes=" + offset + "-"); + } + return conn; + } + } + + static class RangeHeaderInputStream extends ByteRangeInputStream { + RangeHeaderInputStream(RangeHeaderUrlOpener o, RangeHeaderUrlOpener r) { + super(o, r); + } + + RangeHeaderInputStream(final URL url) { + this(new RangeHeaderUrlOpener(url), new RangeHeaderUrlOpener(null)); + } + + /** Expects HTTP_OK and HTTP_PARTIAL response codes. */ + @Override + protected void checkResponseCode(final HttpURLConnection connection + ) throws IOException { + final int code = connection.getResponseCode(); + if (startPos != 0 && code != HttpURLConnection.HTTP_PARTIAL) { + // We asked for a byte range but did not receive a partial content + // response... + throw new IOException("HTTP_PARTIAL expected, received " + code); + } else if (startPos == 0 && code != HttpURLConnection.HTTP_OK) { + // We asked for all bytes from the beginning but didn't receive a 200 + // response (none of the other 2xx codes are valid here) + throw new IOException("HTTP_OK expected, received " + code); + } + } + + @Override + protected URL getResolvedUrl(final HttpURLConnection connection) { + return connection.getURL(); + } + } + @Override public FSDataInputStream open(Path f, int buffersize) throws IOException { f = f.makeQualified(getUri(), getWorkingDirectory()); String path = "/data" + ServletUtil.encodePath(f.toUri().getPath()); String query = addDelegationTokenParam("ugi=" + getEncodedUgiParameter()); URL u = getNamenodeURL(path, query); - return new FSDataInputStream(new ByteRangeInputStream(u)); + return new FSDataInputStream(new RangeHeaderInputStream(u)); } /** Class to parse and store a listing reply from the server. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java index e8c00ca005..ba10ee79fc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java @@ -350,9 +350,7 @@ public class DatanodeWebHdfsMethods { } }; - final int status = offset.getValue() == 0? - HttpServletResponse.SC_OK: HttpServletResponse.SC_PARTIAL_CONTENT; - return Response.status(status).entity(streaming).type( + return Response.ok(streaming).type( MediaType.APPLICATION_OCTET_STREAM).build(); } case GETFILECHECKSUM: diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java index 84b44640ea..963538f506 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java @@ -42,8 +42,6 @@ import javax.ws.rs.QueryParam; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; -import javax.ws.rs.core.Response.ResponseBuilder; -import javax.ws.rs.core.Response.Status; import javax.ws.rs.core.StreamingOutput; import org.apache.commons.logging.Log; @@ -68,7 +66,6 @@ import org.apache.hadoop.hdfs.web.resources.AccessTimeParam; import org.apache.hadoop.hdfs.web.resources.BlockSizeParam; import org.apache.hadoop.hdfs.web.resources.BufferSizeParam; import org.apache.hadoop.hdfs.web.resources.DelegationParam; -import org.apache.hadoop.hdfs.web.resources.TokenArgumentParam; import org.apache.hadoop.hdfs.web.resources.DeleteOpParam; import org.apache.hadoop.hdfs.web.resources.DestinationParam; import org.apache.hadoop.hdfs.web.resources.GetOpParam; @@ -87,6 +84,7 @@ import org.apache.hadoop.hdfs.web.resources.RecursiveParam; import org.apache.hadoop.hdfs.web.resources.RenameOptionSetParam; import org.apache.hadoop.hdfs.web.resources.RenewerParam; import org.apache.hadoop.hdfs.web.resources.ReplicationParam; +import org.apache.hadoop.hdfs.web.resources.TokenArgumentParam; import org.apache.hadoop.hdfs.web.resources.UriFsPathParam; import org.apache.hadoop.hdfs.web.resources.UserParam; import org.apache.hadoop.net.NodeBase; @@ -153,8 +151,7 @@ public class NamenodeWebHdfsMethods { final NameNode namenode, final UserGroupInformation ugi, final String renewer) throws IOException { final Credentials c = DelegationTokenSecretManager.createCredentials( - namenode, ugi, - renewer != null? renewer: request.getUserPrincipal().getName()); + namenode, ugi, renewer != null? renewer: ugi.getShortUserName()); final Token t = c.getAllTokens().iterator().next(); t.setKind(WebHdfsFileSystem.TOKEN_KIND); SecurityUtil.setTokenService(t, namenode.getNameNodeAddress()); @@ -325,8 +322,7 @@ public class NamenodeWebHdfsMethods { { final boolean b = np.setReplication(fullpath, replication.getValue(conf)); final String js = JsonUtil.toJsonString("boolean", b); - final ResponseBuilder r = b? Response.ok(): Response.status(Status.FORBIDDEN); - return r.entity(js).type(MediaType.APPLICATION_JSON).build(); + return Response.ok(js).type(MediaType.APPLICATION_JSON).build(); } case SETOWNER: { @@ -509,7 +505,7 @@ public class NamenodeWebHdfsMethods { op.getValue(), offset.getValue(), offset, length, bufferSize); return Response.temporaryRedirect(uri).build(); } - case GETFILEBLOCKLOCATIONS: + case GET_BLOCK_LOCATIONS: { final long offsetValue = offset.getValue(); final Long lengthValue = length.getValue(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java index d166d63a98..2e464b2d34 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java @@ -134,6 +134,14 @@ public class JsonUtil { return new FsPermission(Short.parseShort(s, 8)); } + static enum PathType { + FILE, DIRECTORY, SYMLINK; + + static PathType valueOf(HdfsFileStatus status) { + return status.isDir()? DIRECTORY: status.isSymlink()? SYMLINK: FILE; + } + } + /** Convert a HdfsFileStatus object to a Json string. */ public static String toJsonString(final HdfsFileStatus status, boolean includeType) { @@ -142,13 +150,12 @@ public class JsonUtil { } final Map m = new TreeMap(); m.put("localName", status.getLocalName()); - m.put("isDir", status.isDir()); - m.put("isSymlink", status.isSymlink()); + m.put("type", PathType.valueOf(status)); if (status.isSymlink()) { m.put("symlink", status.getSymlink()); } - m.put("len", status.getLen()); + m.put("length", status.getLen()); m.put("owner", status.getOwner()); m.put("group", status.getGroup()); m.put("permission", toString(status.getPermission())); @@ -169,12 +176,11 @@ public class JsonUtil { final Map m = includesType ? (Map)json.get(HdfsFileStatus.class.getSimpleName()) : json; final String localName = (String) m.get("localName"); - final boolean isDir = (Boolean) m.get("isDir"); - final boolean isSymlink = (Boolean) m.get("isSymlink"); - final byte[] symlink = isSymlink? - DFSUtil.string2Bytes((String)m.get("symlink")): null; + final PathType type = PathType.valueOf((String) m.get("type")); + final byte[] symlink = type != PathType.SYMLINK? null + : DFSUtil.string2Bytes((String)m.get("symlink")); - final long len = (Long) m.get("len"); + final long len = (Long) m.get("length"); final String owner = (String) m.get("owner"); final String group = (String) m.get("group"); final FsPermission permission = toFsPermission((String) m.get("permission")); @@ -182,8 +188,8 @@ public class JsonUtil { final long mTime = (Long) m.get("modificationTime"); final long blockSize = (Long) m.get("blockSize"); final short replication = (short) (long) (Long) m.get("replication"); - return new HdfsFileStatus(len, isDir, replication, blockSize, mTime, aTime, - permission, owner, group, + return new HdfsFileStatus(len, type == PathType.DIRECTORY, replication, + blockSize, mTime, aTime, permission, owner, group, symlink, DFSUtil.string2Bytes(localName)); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java index b37c0ed6e0..1699d86db5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java @@ -25,12 +25,14 @@ import java.io.InputStream; import java.io.InputStreamReader; import java.net.HttpURLConnection; import java.net.InetSocketAddress; +import java.net.MalformedURLException; import java.net.URI; import java.net.URISyntaxException; import java.net.URL; import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.StringTokenizer; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -62,7 +64,6 @@ import org.apache.hadoop.hdfs.server.namenode.SafeModeException; import org.apache.hadoop.hdfs.web.resources.AccessTimeParam; import org.apache.hadoop.hdfs.web.resources.BlockSizeParam; import org.apache.hadoop.hdfs.web.resources.BufferSizeParam; -import org.apache.hadoop.hdfs.web.resources.TokenArgumentParam; import org.apache.hadoop.hdfs.web.resources.DeleteOpParam; import org.apache.hadoop.hdfs.web.resources.DestinationParam; import org.apache.hadoop.hdfs.web.resources.GetOpParam; @@ -81,6 +82,7 @@ import org.apache.hadoop.hdfs.web.resources.RecursiveParam; import org.apache.hadoop.hdfs.web.resources.RenameOptionSetParam; import org.apache.hadoop.hdfs.web.resources.RenewerParam; import org.apache.hadoop.hdfs.web.resources.ReplicationParam; +import org.apache.hadoop.hdfs.web.resources.TokenArgumentParam; import org.apache.hadoop.hdfs.web.resources.UserParam; import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.RemoteException; @@ -388,9 +390,9 @@ public class WebHdfsFileSystem extends FileSystem private FileStatus makeQualified(HdfsFileStatus f, Path parent) { return new FileStatus(f.getLen(), f.isDir(), f.getReplication(), - f.getBlockSize(), f.getModificationTime(), - f.getAccessTime(), + f.getBlockSize(), f.getModificationTime(), f.getAccessTime(), f.getPermission(), f.getOwner(), f.getGroup(), + f.isSymlink() ? new Path(f.getSymlink()) : null, f.getFullPath(parent).makeQualified(getUri(), getWorkingDirectory())); } @@ -532,24 +534,84 @@ public class WebHdfsFileSystem extends FileSystem statistics.incrementReadOps(1); final HttpOpParam.Op op = GetOpParam.Op.OPEN; final URL url = toUrl(op, f, new BufferSizeParam(buffersize)); - ByteRangeInputStream str = getByteRangeInputStream(url); - return new FSDataInputStream(str); + return new FSDataInputStream(new OffsetUrlInputStream( + new OffsetUrlOpener(url), new OffsetUrlOpener(null))); } - private class URLOpener extends ByteRangeInputStream.URLOpener { - - public URLOpener(URL u) { - super(u); + class OffsetUrlOpener extends ByteRangeInputStream.URLOpener { + /** The url with offset parameter */ + private URL offsetUrl; + + OffsetUrlOpener(final URL url) { + super(url); } + /** Open connection with offset url. */ @Override - public HttpURLConnection openConnection() throws IOException { + protected HttpURLConnection openConnection() throws IOException { return getHttpUrlConnection(offsetUrl); } + + /** Setup offset url before open connection. */ + @Override + protected HttpURLConnection openConnection(final long offset) throws IOException { + offsetUrl = offset == 0L? url: new URL(url + "&" + new OffsetParam(offset)); + final HttpURLConnection conn = openConnection(); + conn.setRequestMethod("GET"); + return conn; + } } - - private ByteRangeInputStream getByteRangeInputStream(URL url) { - return new ByteRangeInputStream(new URLOpener(url), new URLOpener(null)); + + private static final String OFFSET_PARAM_PREFIX = OffsetParam.NAME + "="; + + /** Remove offset parameter, if there is any, from the url */ + static URL removeOffsetParam(final URL url) throws MalformedURLException { + String query = url.getQuery(); + if (query == null) { + return url; + } + final String lower = query.toLowerCase(); + if (!lower.startsWith(OFFSET_PARAM_PREFIX) + && !lower.contains("&" + OFFSET_PARAM_PREFIX)) { + return url; + } + + //rebuild query + StringBuilder b = null; + for(final StringTokenizer st = new StringTokenizer(query, "&"); + st.hasMoreTokens();) { + final String token = st.nextToken(); + if (!token.toLowerCase().startsWith(OFFSET_PARAM_PREFIX)) { + if (b == null) { + b = new StringBuilder("?").append(token); + } else { + b.append('&').append(token); + } + } + } + query = b == null? "": b.toString(); + + final String urlStr = url.toString(); + return new URL(urlStr.substring(0, urlStr.indexOf('?')) + query); + } + + static class OffsetUrlInputStream extends ByteRangeInputStream { + OffsetUrlInputStream(URLOpener o, URLOpener r) { + super(o, r); + } + + @Override + protected void checkResponseCode(final HttpURLConnection connection + ) throws IOException { + validateResponse(GetOpParam.Op.OPEN, connection); + } + + /** Remove offset parameter before returning the resolved url. */ + @Override + protected URL getResolvedUrl(final HttpURLConnection connection + ) throws MalformedURLException { + return removeOffsetParam(connection.getURL()); + } } @Override @@ -641,7 +703,7 @@ public class WebHdfsFileSystem extends FileSystem final long offset, final long length) throws IOException { statistics.incrementReadOps(1); - final HttpOpParam.Op op = GetOpParam.Op.GETFILEBLOCKLOCATIONS; + final HttpOpParam.Op op = GetOpParam.Op.GET_BLOCK_LOCATIONS; final Map m = run(op, p, new OffsetParam(offset), new LengthParam(length)); return DFSUtil.locatedBlocks2Locations(JsonUtil.toLocatedBlocks(m)); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/GetOpParam.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/GetOpParam.java index cab71c99d2..1958473cb4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/GetOpParam.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/GetOpParam.java @@ -24,7 +24,6 @@ public class GetOpParam extends HttpOpParam { /** Get operations. */ public static enum Op implements HttpOpParam.Op { OPEN(HttpURLConnection.HTTP_OK), - GETFILEBLOCKLOCATIONS(HttpURLConnection.HTTP_OK), GETFILESTATUS(HttpURLConnection.HTTP_OK), LISTSTATUS(HttpURLConnection.HTTP_OK), @@ -33,6 +32,9 @@ public class GetOpParam extends HttpOpParam { GETDELEGATIONTOKEN(HttpURLConnection.HTTP_OK), + /** GET_BLOCK_LOCATIONS is a private unstable op. */ + GET_BLOCK_LOCATIONS(HttpURLConnection.HTTP_OK), + NULL(HttpURLConnection.HTTP_NOT_IMPLEMENTED); final int expectedHttpResponseCode; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestByteRangeInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestByteRangeInputStream.java index 3be88d3e5c..9e1b73bbd8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestByteRangeInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestByteRangeInputStream.java @@ -31,10 +31,10 @@ import java.io.InputStream; import java.net.HttpURLConnection; import java.net.URL; -import org.apache.hadoop.hdfs.ByteRangeInputStream.URLOpener; import org.junit.Test; -class MockHttpURLConnection extends HttpURLConnection { +public class TestByteRangeInputStream { +public static class MockHttpURLConnection extends HttpURLConnection { public MockHttpURLConnection(URL u) { super(u); } @@ -85,54 +85,18 @@ class MockHttpURLConnection extends HttpURLConnection { responseCode = resCode; } } - -public class TestByteRangeInputStream { - @Test - public void testRemoveOffset() throws IOException { - { //no offset - String s = "http://test/Abc?Length=99"; - assertEquals(s, ByteRangeInputStream.removeOffsetParam(new URL(s)).toString()); - } - - { //no parameters - String s = "http://test/Abc"; - assertEquals(s, ByteRangeInputStream.removeOffsetParam(new URL(s)).toString()); - } - - { //offset as first parameter - String s = "http://test/Abc?offset=10&Length=99"; - assertEquals("http://test/Abc?Length=99", - ByteRangeInputStream.removeOffsetParam(new URL(s)).toString()); - } - - { //offset as second parameter - String s = "http://test/Abc?op=read&OFFset=10&Length=99"; - assertEquals("http://test/Abc?op=read&Length=99", - ByteRangeInputStream.removeOffsetParam(new URL(s)).toString()); - } - - { //offset as last parameter - String s = "http://test/Abc?Length=99&offset=10"; - assertEquals("http://test/Abc?Length=99", - ByteRangeInputStream.removeOffsetParam(new URL(s)).toString()); - } - - { //offset as the only parameter - String s = "http://test/Abc?offset=10"; - assertEquals("http://test/Abc", - ByteRangeInputStream.removeOffsetParam(new URL(s)).toString()); - } - } @Test public void testByteRange() throws IOException { - URLOpener ospy = spy(new URLOpener(new URL("http://test/"))); + HftpFileSystem.RangeHeaderUrlOpener ospy = spy( + new HftpFileSystem.RangeHeaderUrlOpener(new URL("http://test/"))); doReturn(new MockHttpURLConnection(ospy.getURL())).when(ospy) .openConnection(); - URLOpener rspy = spy(new URLOpener((URL) null)); + HftpFileSystem.RangeHeaderUrlOpener rspy = spy( + new HftpFileSystem.RangeHeaderUrlOpener((URL) null)); doReturn(new MockHttpURLConnection(rspy.getURL())).when(rspy) .openConnection(); - ByteRangeInputStream is = new ByteRangeInputStream(ospy, rspy); + ByteRangeInputStream is = new HftpFileSystem.RangeHeaderInputStream(ospy, rspy); assertEquals("getPos wrong", 0, is.getPos()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestOffsetUrlInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestOffsetUrlInputStream.java new file mode 100644 index 0000000000..4ef0dd680e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestOffsetUrlInputStream.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.web; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.io.IOException; +import java.net.URI; +import java.net.URL; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hdfs.TestByteRangeInputStream.MockHttpURLConnection; +import org.apache.hadoop.hdfs.web.WebHdfsFileSystem.OffsetUrlInputStream; +import org.apache.hadoop.hdfs.web.WebHdfsFileSystem.OffsetUrlOpener; +import org.junit.Test; + +public class TestOffsetUrlInputStream { + @Test + public void testRemoveOffset() throws IOException { + { //no offset + String s = "http://test/Abc?Length=99"; + assertEquals(s, WebHdfsFileSystem.removeOffsetParam(new URL(s)).toString()); + } + + { //no parameters + String s = "http://test/Abc"; + assertEquals(s, WebHdfsFileSystem.removeOffsetParam(new URL(s)).toString()); + } + + { //offset as first parameter + String s = "http://test/Abc?offset=10&Length=99"; + assertEquals("http://test/Abc?Length=99", + WebHdfsFileSystem.removeOffsetParam(new URL(s)).toString()); + } + + { //offset as second parameter + String s = "http://test/Abc?op=read&OFFset=10&Length=99"; + assertEquals("http://test/Abc?op=read&Length=99", + WebHdfsFileSystem.removeOffsetParam(new URL(s)).toString()); + } + + { //offset as last parameter + String s = "http://test/Abc?Length=99&offset=10"; + assertEquals("http://test/Abc?Length=99", + WebHdfsFileSystem.removeOffsetParam(new URL(s)).toString()); + } + + { //offset as the only parameter + String s = "http://test/Abc?offset=10"; + assertEquals("http://test/Abc", + WebHdfsFileSystem.removeOffsetParam(new URL(s)).toString()); + } + } + + @Test + public void testByteRange() throws Exception { + final Configuration conf = new Configuration(); + final String uri = WebHdfsFileSystem.SCHEME + "://localhost:50070/"; + final WebHdfsFileSystem webhdfs = (WebHdfsFileSystem)FileSystem.get(new URI(uri), conf); + + OffsetUrlOpener ospy = spy(webhdfs.new OffsetUrlOpener(new URL("http://test/"))); + doReturn(new MockHttpURLConnection(ospy.getURL())).when(ospy) + .openConnection(); + OffsetUrlOpener rspy = spy(webhdfs.new OffsetUrlOpener((URL) null)); + doReturn(new MockHttpURLConnection(rspy.getURL())).when(rspy) + .openConnection(); + final OffsetUrlInputStream is = new OffsetUrlInputStream(ospy, rspy); + + assertEquals("getPos wrong", 0, is.getPos()); + + is.read(); + + assertNull("Initial call made incorrectly (Range Check)", ospy + .openConnection().getRequestProperty("Range")); + + assertEquals("getPos should be 1 after reading one byte", 1, is.getPos()); + + is.read(); + + assertEquals("getPos should be 2 after reading two bytes", 2, is.getPos()); + + // No additional connections should have been made (no seek) + + rspy.setURL(new URL("http://resolvedurl/")); + + is.seek(100); + is.read(); + + assertEquals("getPos should be 101 after reading one byte", 101, + is.getPos()); + + verify(rspy, times(1)).openConnection(); + + is.seek(101); + is.read(); + + verify(rspy, times(1)).openConnection(); + + // Seek to 101 should not result in another request" + + is.seek(2500); + is.read(); + + ((MockHttpURLConnection) rspy.openConnection()).setResponseCode(206); + is.seek(0); + + try { + is.read(); + fail("Exception should be thrown when 206 response is given " + + "but 200 is expected"); + } catch (IOException e) { + WebHdfsFileSystem.LOG.info(e.toString()); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java index 7d990bded5..3d366423b5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java @@ -280,7 +280,7 @@ public class TestWebHdfsFileSystemContract extends FileSystemContractBaseTest { final HttpURLConnection conn = (HttpURLConnection) url.openConnection(); conn.setRequestMethod(op.getType().toString()); conn.connect(); - assertEquals(HttpServletResponse.SC_FORBIDDEN, conn.getResponseCode()); + assertEquals(HttpServletResponse.SC_OK, conn.getResponseCode()); assertFalse(webhdfs.setReplication(dir, (short)1)); conn.disconnect();