From f86352c2dff7e75314a8fc08c32f4a37c098cc62 Mon Sep 17 00:00:00 2001 From: Tsz-wo Sze Date: Wed, 25 Apr 2012 19:24:00 +0000 Subject: [PATCH] HDFS-3318. Use BoundedInputStream in ByteRangeInputStream, otherwise, it hangs on transfers >2 GB. Contributed by Daryn Sharp git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1330500 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +++ .../hadoop/hdfs/ByteRangeInputStream.java | 25 ++++++++++++++----- .../hadoop/hdfs/TestByteRangeInputStream.java | 6 +++++ 3 files changed, 28 insertions(+), 6 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index a03920e9bc..ad5b9aa9d7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -898,6 +898,9 @@ Release 0.23.3 - UNRELEASED HDFS-3312. In HftpFileSystem, the namenode URI is non-secure but the delegation tokens have to use secure URI. (Daryn Sharp via szetszwo) + HDFS-3318. Use BoundedInputStream in ByteRangeInputStream, otherwise, it + hangs on transfers >2 GB. (Daryn Sharp via szetszwo) + Release 0.23.2 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java index 36c7c1dec0..eac1ab7555 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java @@ -23,6 +23,7 @@ import java.net.HttpURLConnection; import java.net.URL; +import org.apache.commons.io.input.BoundedInputStream; import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.hdfs.server.namenode.StreamFile; @@ -106,8 +107,14 @@ private InputStream getInputStream() throws IOException { checkResponseCode(connection); final String cl = connection.getHeaderField(StreamFile.CONTENT_LENGTH); - filelength = (cl == null) ? -1 : Long.parseLong(cl); - in = connection.getInputStream(); + if (cl == null) { + throw new IOException(StreamFile.CONTENT_LENGTH+" header is missing"); + } + final long streamlength = Long.parseLong(cl); + filelength = startPos + streamlength; + // Java has a bug with >2GB request streams. It won't bounds check + // the reads so the transfer blocks until the server times out + in = new BoundedInputStream(connection.getInputStream(), streamlength); resolvedURL.setURL(getResolvedUrl(connection)); status = StreamStatus.NORMAL; @@ -116,21 +123,27 @@ private InputStream getInputStream() throws IOException { return in; } - private void update(final boolean isEOF, final int n) - throws IOException { - if (!isEOF) { + private int update(final int n) throws IOException { + if (n != -1) { currentPos += n; } else if (currentPos < filelength) { throw new IOException("Got EOF but currentPos = " + currentPos + " < filelength = " + filelength); } + return n; } + @Override public int read() throws IOException { final int b = getInputStream().read(); - update(b == -1, 1); + update((b == -1) ? -1 : 1); return b; } + + @Override + public int read(byte b[], int off, int len) throws IOException { + return update(getInputStream().read(b, off, len)); + } /** * Seek to the given offset from the start of the file. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestByteRangeInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestByteRangeInputStream.java index 9e1b73bbd8..f30525d06f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestByteRangeInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestByteRangeInputStream.java @@ -31,6 +31,7 @@ import java.net.HttpURLConnection; import java.net.URL; +import org.apache.hadoop.hdfs.server.namenode.StreamFile; import org.junit.Test; public class TestByteRangeInputStream { @@ -84,6 +85,11 @@ public int getResponseCode() { public void setResponseCode(int resCode) { responseCode = resCode; } + + @Override + public String getHeaderField(String field) { + return (field.equalsIgnoreCase(StreamFile.CONTENT_LENGTH)) ? "65535" : null; + } } @Test