HDFS-3788. ByteRangeInputStream should not expect HTTP Content-Length header when chunked transfer-encoding is used.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1374122 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
bbe4584ebf
commit
87d87b04e6
@ -625,6 +625,9 @@ Branch-2 ( Unreleased changes )
|
|||||||
HDFS-3808. fuse_dfs: postpone libhdfs intialization until after fork.
|
HDFS-3808. fuse_dfs: postpone libhdfs intialization until after fork.
|
||||||
(Colin Patrick McCabe via atm)
|
(Colin Patrick McCabe via atm)
|
||||||
|
|
||||||
|
HDFS-3788. ByteRangeInputStream should not expect HTTP Content-Length header
|
||||||
|
when chunked transfer-encoding is used. (szetszwo)
|
||||||
|
|
||||||
BREAKDOWN OF HDFS-3042 SUBTASKS
|
BREAKDOWN OF HDFS-3042 SUBTASKS
|
||||||
|
|
||||||
HDFS-2185. HDFS portion of ZK-based FailoverController (todd)
|
HDFS-2185. HDFS portion of ZK-based FailoverController (todd)
|
||||||
|
@ -22,12 +22,15 @@
|
|||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.net.HttpURLConnection;
|
import java.net.HttpURLConnection;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.StringTokenizer;
|
||||||
|
|
||||||
import org.apache.commons.io.input.BoundedInputStream;
|
import org.apache.commons.io.input.BoundedInputStream;
|
||||||
import org.apache.hadoop.fs.FSInputStream;
|
import org.apache.hadoop.fs.FSInputStream;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.StreamFile;
|
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import com.google.common.net.HttpHeaders;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* To support HTTP byte streams, a new connection to an HTTP server needs to be
|
* To support HTTP byte streams, a new connection to an HTTP server needs to be
|
||||||
@ -70,7 +73,7 @@ enum StreamStatus {
|
|||||||
protected URLOpener resolvedURL;
|
protected URLOpener resolvedURL;
|
||||||
protected long startPos = 0;
|
protected long startPos = 0;
|
||||||
protected long currentPos = 0;
|
protected long currentPos = 0;
|
||||||
protected long filelength;
|
protected Long fileLength = null;
|
||||||
|
|
||||||
StreamStatus status = StreamStatus.SEEK;
|
StreamStatus status = StreamStatus.SEEK;
|
||||||
|
|
||||||
@ -114,28 +117,60 @@ protected InputStream openInputStream() throws IOException {
|
|||||||
final URLOpener opener = resolved? resolvedURL: originalURL;
|
final URLOpener opener = resolved? resolvedURL: originalURL;
|
||||||
|
|
||||||
final HttpURLConnection connection = opener.connect(startPos, resolved);
|
final HttpURLConnection connection = opener.connect(startPos, resolved);
|
||||||
final String cl = connection.getHeaderField(StreamFile.CONTENT_LENGTH);
|
|
||||||
if (cl == null) {
|
|
||||||
throw new IOException(StreamFile.CONTENT_LENGTH+" header is missing");
|
|
||||||
}
|
|
||||||
final long streamlength = Long.parseLong(cl);
|
|
||||||
filelength = startPos + streamlength;
|
|
||||||
// Java has a bug with >2GB request streams. It won't bounds check
|
|
||||||
// the reads so the transfer blocks until the server times out
|
|
||||||
InputStream is =
|
|
||||||
new BoundedInputStream(connection.getInputStream(), streamlength);
|
|
||||||
|
|
||||||
resolvedURL.setURL(getResolvedUrl(connection));
|
resolvedURL.setURL(getResolvedUrl(connection));
|
||||||
|
|
||||||
return is;
|
InputStream in = connection.getInputStream();
|
||||||
|
final Map<String, List<String>> headers = connection.getHeaderFields();
|
||||||
|
if (isChunkedTransferEncoding(headers)) {
|
||||||
|
// file length is not known
|
||||||
|
fileLength = null;
|
||||||
|
} else {
|
||||||
|
// for non-chunked transfer-encoding, get content-length
|
||||||
|
final String cl = connection.getHeaderField(HttpHeaders.CONTENT_LENGTH);
|
||||||
|
if (cl == null) {
|
||||||
|
throw new IOException(HttpHeaders.CONTENT_LENGTH + " is missing: "
|
||||||
|
+ headers);
|
||||||
|
}
|
||||||
|
final long streamlength = Long.parseLong(cl);
|
||||||
|
fileLength = startPos + streamlength;
|
||||||
|
|
||||||
|
// Java has a bug with >2GB request streams. It won't bounds check
|
||||||
|
// the reads so the transfer blocks until the server times out
|
||||||
|
in = new BoundedInputStream(in, streamlength);
|
||||||
|
}
|
||||||
|
|
||||||
|
return in;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static boolean isChunkedTransferEncoding(
|
||||||
|
final Map<String, List<String>> headers) {
|
||||||
|
return contains(headers, HttpHeaders.TRANSFER_ENCODING, "chunked")
|
||||||
|
|| contains(headers, HttpHeaders.TE, "chunked");
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Does the HTTP header map contain the given key, value pair? */
|
||||||
|
private static boolean contains(final Map<String, List<String>> headers,
|
||||||
|
final String key, final String value) {
|
||||||
|
final List<String> values = headers.get(key);
|
||||||
|
if (values != null) {
|
||||||
|
for(String v : values) {
|
||||||
|
for(final StringTokenizer t = new StringTokenizer(v, ",");
|
||||||
|
t.hasMoreTokens(); ) {
|
||||||
|
if (value.equalsIgnoreCase(t.nextToken())) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
private int update(final int n) throws IOException {
|
private int update(final int n) throws IOException {
|
||||||
if (n != -1) {
|
if (n != -1) {
|
||||||
currentPos += n;
|
currentPos += n;
|
||||||
} else if (currentPos < filelength) {
|
} else if (fileLength != null && currentPos < fileLength) {
|
||||||
throw new IOException("Got EOF but currentPos = " + currentPos
|
throw new IOException("Got EOF but currentPos = " + currentPos
|
||||||
+ " < filelength = " + filelength);
|
+ " < filelength = " + fileLength);
|
||||||
}
|
}
|
||||||
return n;
|
return n;
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user