diff --git a/hdfs/CHANGES.txt b/hdfs/CHANGES.txt index 28d9d0b195..41f3dda698 100644 --- a/hdfs/CHANGES.txt +++ b/hdfs/CHANGES.txt @@ -532,6 +532,8 @@ Trunk (unreleased changes) HDFS-1723. quota errors messages should use the same scale. (Jim Plush via atm) + HDFS-2110. StreamFile and ByteRangeInputStream cleanup. (eli) + OPTIMIZATIONS HDFS-1458. Improve checkpoint performance by avoiding unnecessary image diff --git a/hdfs/src/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java b/hdfs/src/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java index 937d34e2bb..fcdf6cb564 100644 --- a/hdfs/src/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java +++ b/hdfs/src/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java @@ -26,7 +26,6 @@ import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.hdfs.server.namenode.StreamFile; - /** * To support HTTP byte streams, a new connection to an HTTP server needs to be * created each time. This class hides the complexity of those multiple @@ -60,7 +59,9 @@ public HttpURLConnection openConnection() throws IOException { } } - + enum StreamStatus { + NORMAL, SEEK + } protected InputStream in; protected URLOpener originalURL; protected URLOpener resolvedURL; @@ -68,9 +69,7 @@ public HttpURLConnection openConnection() throws IOException { protected long currentPos = 0; protected long filelength; - protected int status = STATUS_SEEK; - protected static final int STATUS_NORMAL = 0; - protected static final int STATUS_SEEK = 1; + StreamStatus status = StreamStatus.SEEK; ByteRangeInputStream(final URL url) { this(new URLOpener(url), new URLOpener(null)); @@ -82,18 +81,19 @@ public HttpURLConnection openConnection() throws IOException { } private InputStream getInputStream() throws IOException { - if (status != STATUS_NORMAL) { + if (status != StreamStatus.NORMAL) { if (in != null) { in.close(); in = null; } - // use the original url if no resolved url exists (e.g., if it's - // the first time a request is made) - final URLOpener o = resolvedURL.getURL() == null? originalURL: resolvedURL; + // Use the original url if no resolved url exists, eg. if + // it's the first time a request is made. + final URLOpener opener = + (resolvedURL.getURL() == null) ? originalURL : resolvedURL; - final HttpURLConnection connection = o.openConnection(); + final HttpURLConnection connection = opener.openConnection(); try { connection.setRequestMethod("GET"); if (startPos != 0) { @@ -101,36 +101,35 @@ private InputStream getInputStream() throws IOException { } connection.connect(); final String cl = connection.getHeaderField(StreamFile.CONTENT_LENGTH); - filelength = cl == null? -1: Long.parseLong(cl); + filelength = (cl == null) ? -1 : Long.parseLong(cl); if (HftpFileSystem.LOG.isDebugEnabled()) { HftpFileSystem.LOG.debug("filelength = " + filelength); } in = connection.getInputStream(); - } catch(IOException ioe) { + } catch (IOException ioe) { HftpFileSystem.throwIOExceptionFromConnection(connection, ioe); } - if (startPos != 0 && connection.getResponseCode() != 206) { - // we asked for a byte range but did not receive a partial content + int respCode = connection.getResponseCode(); + if (startPos != 0 && respCode != HttpURLConnection.HTTP_PARTIAL) { + // We asked for a byte range but did not receive a partial content // response... - throw new IOException("206 expected, but received " - + connection.getResponseCode()); - } else if(startPos == 0 && connection.getResponseCode() != 200) { - // we asked for all bytes from the beginning but didn't receive a 200 + throw new IOException("HTTP_PARTIAL expected, received " + respCode); + } else if (startPos == 0 && respCode != HttpURLConnection.HTTP_OK) { + // We asked for all bytes from the beginning but didn't receive a 200 // response (none of the other 2xx codes are valid here) - throw new IOException("200 expected, but received " - + connection.getResponseCode()); + throw new IOException("HTTP_OK expected, received " + respCode); } - + resolvedURL.setURL(connection.getURL()); - status = STATUS_NORMAL; + status = StreamStatus.NORMAL; } return in; } - private void update(final boolean isEOF, final int n - ) throws IOException { + private void update(final boolean isEOF, final int n) + throws IOException { if (!isEOF) { currentPos += n; } else if (currentPos < filelength) { @@ -154,7 +153,7 @@ public void seek(long pos) throws IOException { if (pos != currentPos) { startPos = pos; currentPos = pos; - status = STATUS_SEEK; + status = StreamStatus.SEEK; } } @@ -162,7 +161,7 @@ public void seek(long pos) throws IOException { * Return the current offset from the start of the file */ public long getPos() throws IOException { - return currentPos; // keep total count? + return currentPos; } /** @@ -172,7 +171,4 @@ public long getPos() throws IOException { public boolean seekToNewSource(long targetPos) throws IOException { return false; } - -} - - +} \ No newline at end of file diff --git a/hdfs/src/java/org/apache/hadoop/hdfs/HftpFileSystem.java b/hdfs/src/java/org/apache/hadoop/hdfs/HftpFileSystem.java index 684d8e8a39..0c9e001303 100644 --- a/hdfs/src/java/org/apache/hadoop/hdfs/HftpFileSystem.java +++ b/hdfs/src/java/org/apache/hadoop/hdfs/HftpFileSystem.java @@ -245,17 +245,26 @@ public URI getUri() { } } - - /* - Construct URL pointing to file on namenode - */ - URL getNamenodeFileURL(Path f) throws IOException { - return getNamenodeURL("/data" + f.toUri().getPath(), "ugi=" + getUgiParameter()); + /** + * Return a URL pointing to given path on the namenode. + * + * @param p path to obtain the URL for + * @return namenode URL referring to the given path + * @throws IOException on error constructing the URL + */ + URL getNamenodeFileURL(Path p) throws IOException { + return getNamenodeURL("/data" + p.toUri().getPath(), + "ugi=" + getUgiParameter()); } - /* - Construct URL pointing to namenode. - */ + /** + * Return a URL pointing to given path on the namenode. + * + * @param path to obtain the URL for + * @param query string to append to the path + * @return namenode URL referring to the given path + * @throws IOException on error constructing the URL + */ URL getNamenodeURL(String path, String query) throws IOException { try { final URL url = new URI("http", null, nnAddr.getHostName(), diff --git a/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/StreamFile.java b/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/StreamFile.java index f447ea7ebc..ea686a8826 100644 --- a/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/StreamFile.java +++ b/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/StreamFile.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.server.namenode; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.io.PrintWriter; import java.util.Enumeration; @@ -36,6 +37,7 @@ import org.apache.hadoop.hdfs.server.common.JspHelper; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DatanodeJspHelper; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.security.UserGroupInformation; import org.mortbay.jetty.InclusiveByteRange; @@ -96,7 +98,7 @@ public void doGet(HttpServletRequest request, HttpServletResponse response) filename + "\""); response.setContentType("application/octet-stream"); response.setHeader(CONTENT_LENGTH, "" + fileLen); - StreamFile.writeTo(in, os, 0L, fileLen); + StreamFile.copyFromOffset(in, os, 0L, fileLen); } } catch(IOException e) { if (LOG.isDebugEnabled()) { @@ -113,75 +115,46 @@ public void doGet(HttpServletRequest request, HttpServletResponse response) } } + /** + * Send a partial content response with the given range. If there are + * no satisfiable ranges, or if multiple ranges are requested, which + * is unsupported, respond with range not satisfiable. + * + * @param in stream to read from + * @param out stream to write to + * @param response http response to use + * @param contentLength for the response header + * @param ranges to write to respond with + * @throws IOException on error sending the response + */ static void sendPartialData(FSInputStream in, - OutputStream os, + OutputStream out, HttpServletResponse response, long contentLength, List ranges) - throws IOException { - + throws IOException { if (ranges == null || ranges.size() != 1) { - // if there are no satisfiable ranges, or if multiple ranges are - // requested (we don't support multiple range requests), send 416 response response.setContentLength(0); - int status = HttpServletResponse.SC_REQUESTED_RANGE_NOT_SATISFIABLE; - response.setStatus(status); - response.setHeader("Content-Range", + response.setStatus(HttpServletResponse.SC_REQUESTED_RANGE_NOT_SATISFIABLE); + response.setHeader("Content-Range", InclusiveByteRange.to416HeaderRangeString(contentLength)); } else { - // if there is only a single valid range (must be satisfiable - // since were here now), send that range with a 206 response InclusiveByteRange singleSatisfiableRange = (InclusiveByteRange)ranges.get(0); long singleLength = singleSatisfiableRange.getSize(contentLength); response.setStatus(HttpServletResponse.SC_PARTIAL_CONTENT); response.setHeader("Content-Range", singleSatisfiableRange.toHeaderRangeString(contentLength)); - System.out.println("first: "+singleSatisfiableRange.getFirst(contentLength)); - System.out.println("singleLength: "+singleLength); - - StreamFile.writeTo(in, - os, - singleSatisfiableRange.getFirst(contentLength), - singleLength); + copyFromOffset(in, out, + singleSatisfiableRange.getFirst(contentLength), + singleLength); } } - - static void writeTo(FSInputStream in, - OutputStream os, - long start, - long count) - throws IOException { - byte buf[] = new byte[4096]; - long bytesRemaining = count; - int bytesRead; - int bytesToRead; - in.seek(start); - - while (true) { - // number of bytes to read this iteration - bytesToRead = (int)(bytesRemaining