HDFS-2110. StreamFile and ByteRangeInputStream cleanup. Contributed by Eli Collins

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1140694 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Eli Collins 2011-06-28 15:59:31 +00:00
parent 3ebc992e58
commit 7663caab5a
6 changed files with 82 additions and 108 deletions

View File

@ -532,6 +532,8 @@ Trunk (unreleased changes)
HDFS-1723. quota errors messages should use the same scale. (Jim Plush via HDFS-1723. quota errors messages should use the same scale. (Jim Plush via
atm) atm)
HDFS-2110. StreamFile and ByteRangeInputStream cleanup. (eli)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-1458. Improve checkpoint performance by avoiding unnecessary image HDFS-1458. Improve checkpoint performance by avoiding unnecessary image

View File

@ -26,7 +26,6 @@
import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.hdfs.server.namenode.StreamFile; import org.apache.hadoop.hdfs.server.namenode.StreamFile;
/** /**
* To support HTTP byte streams, a new connection to an HTTP server needs to be * To support HTTP byte streams, a new connection to an HTTP server needs to be
* created each time. This class hides the complexity of those multiple * created each time. This class hides the complexity of those multiple
@ -60,7 +59,9 @@ public HttpURLConnection openConnection() throws IOException {
} }
} }
enum StreamStatus {
NORMAL, SEEK
}
protected InputStream in; protected InputStream in;
protected URLOpener originalURL; protected URLOpener originalURL;
protected URLOpener resolvedURL; protected URLOpener resolvedURL;
@ -68,9 +69,7 @@ public HttpURLConnection openConnection() throws IOException {
protected long currentPos = 0; protected long currentPos = 0;
protected long filelength; protected long filelength;
protected int status = STATUS_SEEK; StreamStatus status = StreamStatus.SEEK;
protected static final int STATUS_NORMAL = 0;
protected static final int STATUS_SEEK = 1;
ByteRangeInputStream(final URL url) { ByteRangeInputStream(final URL url) {
this(new URLOpener(url), new URLOpener(null)); this(new URLOpener(url), new URLOpener(null));
@ -82,18 +81,19 @@ public HttpURLConnection openConnection() throws IOException {
} }
private InputStream getInputStream() throws IOException { private InputStream getInputStream() throws IOException {
if (status != STATUS_NORMAL) { if (status != StreamStatus.NORMAL) {
if (in != null) { if (in != null) {
in.close(); in.close();
in = null; in = null;
} }
// use the original url if no resolved url exists (e.g., if it's // Use the original url if no resolved url exists, eg. if
// the first time a request is made) // it's the first time a request is made.
final URLOpener o = resolvedURL.getURL() == null? originalURL: resolvedURL; final URLOpener opener =
(resolvedURL.getURL() == null) ? originalURL : resolvedURL;
final HttpURLConnection connection = o.openConnection(); final HttpURLConnection connection = opener.openConnection();
try { try {
connection.setRequestMethod("GET"); connection.setRequestMethod("GET");
if (startPos != 0) { if (startPos != 0) {
@ -101,7 +101,7 @@ private InputStream getInputStream() throws IOException {
} }
connection.connect(); connection.connect();
final String cl = connection.getHeaderField(StreamFile.CONTENT_LENGTH); final String cl = connection.getHeaderField(StreamFile.CONTENT_LENGTH);
filelength = cl == null? -1: Long.parseLong(cl); filelength = (cl == null) ? -1 : Long.parseLong(cl);
if (HftpFileSystem.LOG.isDebugEnabled()) { if (HftpFileSystem.LOG.isDebugEnabled()) {
HftpFileSystem.LOG.debug("filelength = " + filelength); HftpFileSystem.LOG.debug("filelength = " + filelength);
} }
@ -110,27 +110,26 @@ private InputStream getInputStream() throws IOException {
HftpFileSystem.throwIOExceptionFromConnection(connection, ioe); HftpFileSystem.throwIOExceptionFromConnection(connection, ioe);
} }
if (startPos != 0 && connection.getResponseCode() != 206) { int respCode = connection.getResponseCode();
// we asked for a byte range but did not receive a partial content if (startPos != 0 && respCode != HttpURLConnection.HTTP_PARTIAL) {
// We asked for a byte range but did not receive a partial content
// response... // response...
throw new IOException("206 expected, but received " throw new IOException("HTTP_PARTIAL expected, received " + respCode);
+ connection.getResponseCode()); } else if (startPos == 0 && respCode != HttpURLConnection.HTTP_OK) {
} else if(startPos == 0 && connection.getResponseCode() != 200) { // We asked for all bytes from the beginning but didn't receive a 200
// we asked for all bytes from the beginning but didn't receive a 200
// response (none of the other 2xx codes are valid here) // response (none of the other 2xx codes are valid here)
throw new IOException("200 expected, but received " throw new IOException("HTTP_OK expected, received " + respCode);
+ connection.getResponseCode());
} }
resolvedURL.setURL(connection.getURL()); resolvedURL.setURL(connection.getURL());
status = STATUS_NORMAL; status = StreamStatus.NORMAL;
} }
return in; return in;
} }
private void update(final boolean isEOF, final int n private void update(final boolean isEOF, final int n)
) throws IOException { throws IOException {
if (!isEOF) { if (!isEOF) {
currentPos += n; currentPos += n;
} else if (currentPos < filelength) { } else if (currentPos < filelength) {
@ -154,7 +153,7 @@ public void seek(long pos) throws IOException {
if (pos != currentPos) { if (pos != currentPos) {
startPos = pos; startPos = pos;
currentPos = pos; currentPos = pos;
status = STATUS_SEEK; status = StreamStatus.SEEK;
} }
} }
@ -162,7 +161,7 @@ public void seek(long pos) throws IOException {
* Return the current offset from the start of the file * Return the current offset from the start of the file
*/ */
public long getPos() throws IOException { public long getPos() throws IOException {
return currentPos; // keep total count? return currentPos;
} }
/** /**
@ -172,7 +171,4 @@ public long getPos() throws IOException {
public boolean seekToNewSource(long targetPos) throws IOException { public boolean seekToNewSource(long targetPos) throws IOException {
return false; return false;
} }
} }

View File

@ -245,16 +245,25 @@ public URI getUri() {
} }
} }
/**
/* * Return a URL pointing to given path on the namenode.
Construct URL pointing to file on namenode *
* @param p path to obtain the URL for
* @return namenode URL referring to the given path
* @throws IOException on error constructing the URL
*/ */
URL getNamenodeFileURL(Path f) throws IOException { URL getNamenodeFileURL(Path p) throws IOException {
return getNamenodeURL("/data" + f.toUri().getPath(), "ugi=" + getUgiParameter()); return getNamenodeURL("/data" + p.toUri().getPath(),
"ugi=" + getUgiParameter());
} }
/* /**
Construct URL pointing to namenode. * Return a URL pointing to given path on the namenode.
*
* @param path to obtain the URL for
* @param query string to append to the path
* @return namenode URL referring to the given path
* @throws IOException on error constructing the URL
*/ */
URL getNamenodeURL(String path, String query) throws IOException { URL getNamenodeURL(String path, String query) throws IOException {
try { try {

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.namenode; package org.apache.hadoop.hdfs.server.namenode;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.util.Enumeration; import java.util.Enumeration;
@ -36,6 +37,7 @@
import org.apache.hadoop.hdfs.server.common.JspHelper; import org.apache.hadoop.hdfs.server.common.JspHelper;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DatanodeJspHelper; import org.apache.hadoop.hdfs.server.datanode.DatanodeJspHelper;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.mortbay.jetty.InclusiveByteRange; import org.mortbay.jetty.InclusiveByteRange;
@ -96,7 +98,7 @@ public void doGet(HttpServletRequest request, HttpServletResponse response)
filename + "\""); filename + "\"");
response.setContentType("application/octet-stream"); response.setContentType("application/octet-stream");
response.setHeader(CONTENT_LENGTH, "" + fileLen); response.setHeader(CONTENT_LENGTH, "" + fileLen);
StreamFile.writeTo(in, os, 0L, fileLen); StreamFile.copyFromOffset(in, os, 0L, fileLen);
} }
} catch(IOException e) { } catch(IOException e) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
@ -113,75 +115,46 @@ public void doGet(HttpServletRequest request, HttpServletResponse response)
} }
} }
/**
* Send a partial content response with the given range. If there are
* no satisfiable ranges, or if multiple ranges are requested, which
* is unsupported, respond with range not satisfiable.
*
* @param in stream to read from
* @param out stream to write to
* @param response http response to use
* @param contentLength for the response header
* @param ranges to write to respond with
* @throws IOException on error sending the response
*/
static void sendPartialData(FSInputStream in, static void sendPartialData(FSInputStream in,
OutputStream os, OutputStream out,
HttpServletResponse response, HttpServletResponse response,
long contentLength, long contentLength,
List<?> ranges) List<?> ranges)
throws IOException { throws IOException {
if (ranges == null || ranges.size() != 1) { if (ranges == null || ranges.size() != 1) {
// if there are no satisfiable ranges, or if multiple ranges are
// requested (we don't support multiple range requests), send 416 response
response.setContentLength(0); response.setContentLength(0);
int status = HttpServletResponse.SC_REQUESTED_RANGE_NOT_SATISFIABLE; response.setStatus(HttpServletResponse.SC_REQUESTED_RANGE_NOT_SATISFIABLE);
response.setStatus(status);
response.setHeader("Content-Range", response.setHeader("Content-Range",
InclusiveByteRange.to416HeaderRangeString(contentLength)); InclusiveByteRange.to416HeaderRangeString(contentLength));
} else { } else {
// if there is only a single valid range (must be satisfiable
// since were here now), send that range with a 206 response
InclusiveByteRange singleSatisfiableRange = InclusiveByteRange singleSatisfiableRange =
(InclusiveByteRange)ranges.get(0); (InclusiveByteRange)ranges.get(0);
long singleLength = singleSatisfiableRange.getSize(contentLength); long singleLength = singleSatisfiableRange.getSize(contentLength);
response.setStatus(HttpServletResponse.SC_PARTIAL_CONTENT); response.setStatus(HttpServletResponse.SC_PARTIAL_CONTENT);
response.setHeader("Content-Range", response.setHeader("Content-Range",
singleSatisfiableRange.toHeaderRangeString(contentLength)); singleSatisfiableRange.toHeaderRangeString(contentLength));
System.out.println("first: "+singleSatisfiableRange.getFirst(contentLength)); copyFromOffset(in, out,
System.out.println("singleLength: "+singleLength);
StreamFile.writeTo(in,
os,
singleSatisfiableRange.getFirst(contentLength), singleSatisfiableRange.getFirst(contentLength),
singleLength); singleLength);
} }
} }
static void writeTo(FSInputStream in, /* Copy count bytes at the given offset from one stream to another */
OutputStream os, static void copyFromOffset(FSInputStream in, OutputStream out, long offset,
long start, long count) throws IOException {
long count) in.seek(offset);
throws IOException { IOUtils.copyBytes(in, out, count);
byte buf[] = new byte[4096];
long bytesRemaining = count;
int bytesRead;
int bytesToRead;
in.seek(start);
while (true) {
// number of bytes to read this iteration
bytesToRead = (int)(bytesRemaining<buf.length ?
bytesRemaining:
buf.length);
// number of bytes actually read this iteration
bytesRead = in.read(buf, 0, bytesToRead);
// if we can't read anymore, break
if (bytesRead == -1) {
break;
}
os.write(buf, 0, bytesRead);
bytesRemaining -= bytesRead;
// if we don't need to read anymore, break
if (bytesRemaining <= 0) {
break;
}
}
} }
} }

View File

@ -23,12 +23,13 @@
import java.net.HttpURLConnection; import java.net.HttpURLConnection;
import java.net.MalformedURLException; import java.net.MalformedURLException;
import java.net.URL; import java.net.URL;
import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hdfs.ByteRangeInputStream; import org.apache.hadoop.hdfs.ByteRangeInputStream;
import org.apache.hadoop.hdfs.ByteRangeInputStream.URLOpener; import org.apache.hadoop.hdfs.ByteRangeInputStream.URLOpener;
import org.junit.Test;
import static org.junit.Assert.*;
class MockHttpURLConnection extends HttpURLConnection { class MockHttpURLConnection extends HttpURLConnection {
MockURL m; MockURL m;
@ -101,13 +102,9 @@ public String getMsg() {
} }
} }
public class TestByteRangeInputStream {
@Test
public class TestByteRangeInputStream extends TestCase {
private static final Log LOG =
LogFactory.getLog(TestByteRangeInputStream.class);
public void testByteRange() throws IOException, InterruptedException { public void testByteRange() throws IOException, InterruptedException {
MockURL o = new MockURL("http://test/"); MockURL o = new MockURL("http://test/");
MockURL r = new MockURL((URL)null); MockURL r = new MockURL((URL)null);
@ -168,7 +165,7 @@ public void testByteRange() throws IOException, InterruptedException {
+ "but 206 is expected"); + "but 206 is expected");
} catch (IOException e) { } catch (IOException e) {
assertEquals("Should fail because incorrect response code was sent", assertEquals("Should fail because incorrect response code was sent",
"206 expected, but received 200", e.getMessage()); "HTTP_PARTIAL expected, received 200", e.getMessage());
} }
r.responseCode = 206; r.responseCode = 206;
@ -180,10 +177,7 @@ public void testByteRange() throws IOException, InterruptedException {
+ "but 200 is expected"); + "but 200 is expected");
} catch (IOException e) { } catch (IOException e) {
assertEquals("Should fail because incorrect response code was sent", assertEquals("Should fail because incorrect response code was sent",
"200 expected, but received 206", e.getMessage()); "HTTP_OK expected, received 206", e.getMessage());
} }
} }
} }

View File

@ -219,7 +219,7 @@ public void testWriteTo() throws IOException, InterruptedException {
assertTrue("Pairs array must be even", pairs.length % 2 == 0); assertTrue("Pairs array must be even", pairs.length % 2 == 0);
for (int i = 0; i < pairs.length; i+=2) { for (int i = 0; i < pairs.length; i+=2) {
StreamFile.writeTo(fsin, os, pairs[i], pairs[i+1]); StreamFile.copyFromOffset(fsin, os, pairs[i], pairs[i+1]);
assertArrayEquals("Reading " + pairs[i+1] assertArrayEquals("Reading " + pairs[i+1]
+ " bytes from offset " + pairs[i], + " bytes from offset " + pairs[i],
getOutputArray(pairs[i], pairs[i+1]), getOutputArray(pairs[i], pairs[i+1]),