HDFS-3334. Fix ByteRangeInputStream stream leakage. Contributed by Daryn Sharp
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1331570 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
84b36cfd80
commit
920b8fac18
@ -917,6 +917,8 @@ Release 0.23.3 - UNRELEASED
|
|||||||
|
|
||||||
HDFS-3321. Fix safe mode turn off tip message. (Ravi Prakash via szetszwo)
|
HDFS-3321. Fix safe mode turn off tip message. (Ravi Prakash via szetszwo)
|
||||||
|
|
||||||
|
HDFS-3334. Fix ByteRangeInputStream stream leakage. (Daryn Sharp via szetszwo)
|
||||||
|
|
||||||
Release 0.23.2 - UNRELEASED
|
Release 0.23.2 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -27,6 +27,8 @@
|
|||||||
import org.apache.hadoop.fs.FSInputStream;
|
import org.apache.hadoop.fs.FSInputStream;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.StreamFile;
|
import org.apache.hadoop.hdfs.server.namenode.StreamFile;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* To support HTTP byte streams, a new connection to an HTTP server needs to be
|
* To support HTTP byte streams, a new connection to an HTTP server needs to be
|
||||||
* created each time. This class hides the complexity of those multiple
|
* created each time. This class hides the complexity of those multiple
|
||||||
@ -61,7 +63,7 @@ public URL getURL() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
enum StreamStatus {
|
enum StreamStatus {
|
||||||
NORMAL, SEEK
|
NORMAL, SEEK, CLOSED
|
||||||
}
|
}
|
||||||
protected InputStream in;
|
protected InputStream in;
|
||||||
protected URLOpener originalURL;
|
protected URLOpener originalURL;
|
||||||
@ -89,40 +91,51 @@ protected abstract void checkResponseCode(final HttpURLConnection connection
|
|||||||
protected abstract URL getResolvedUrl(final HttpURLConnection connection
|
protected abstract URL getResolvedUrl(final HttpURLConnection connection
|
||||||
) throws IOException;
|
) throws IOException;
|
||||||
|
|
||||||
private InputStream getInputStream() throws IOException {
|
@VisibleForTesting
|
||||||
if (status != StreamStatus.NORMAL) {
|
protected InputStream getInputStream() throws IOException {
|
||||||
|
switch (status) {
|
||||||
if (in != null) {
|
case NORMAL:
|
||||||
in.close();
|
break;
|
||||||
in = null;
|
case SEEK:
|
||||||
}
|
if (in != null) {
|
||||||
|
in.close();
|
||||||
// Use the original url if no resolved url exists, eg. if
|
}
|
||||||
// it's the first time a request is made.
|
in = openInputStream();
|
||||||
final URLOpener opener =
|
status = StreamStatus.NORMAL;
|
||||||
(resolvedURL.getURL() == null) ? originalURL : resolvedURL;
|
break;
|
||||||
|
case CLOSED:
|
||||||
final HttpURLConnection connection = opener.openConnection(startPos);
|
throw new IOException("Stream closed");
|
||||||
connection.connect();
|
|
||||||
checkResponseCode(connection);
|
|
||||||
|
|
||||||
final String cl = connection.getHeaderField(StreamFile.CONTENT_LENGTH);
|
|
||||||
if (cl == null) {
|
|
||||||
throw new IOException(StreamFile.CONTENT_LENGTH+" header is missing");
|
|
||||||
}
|
|
||||||
final long streamlength = Long.parseLong(cl);
|
|
||||||
filelength = startPos + streamlength;
|
|
||||||
// Java has a bug with >2GB request streams. It won't bounds check
|
|
||||||
// the reads so the transfer blocks until the server times out
|
|
||||||
in = new BoundedInputStream(connection.getInputStream(), streamlength);
|
|
||||||
|
|
||||||
resolvedURL.setURL(getResolvedUrl(connection));
|
|
||||||
status = StreamStatus.NORMAL;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return in;
|
return in;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
protected InputStream openInputStream() throws IOException {
|
||||||
|
// Use the original url if no resolved url exists, eg. if
|
||||||
|
// it's the first time a request is made.
|
||||||
|
final URLOpener opener =
|
||||||
|
(resolvedURL.getURL() == null) ? originalURL : resolvedURL;
|
||||||
|
|
||||||
|
final HttpURLConnection connection = opener.openConnection(startPos);
|
||||||
|
connection.connect();
|
||||||
|
checkResponseCode(connection);
|
||||||
|
|
||||||
|
final String cl = connection.getHeaderField(StreamFile.CONTENT_LENGTH);
|
||||||
|
if (cl == null) {
|
||||||
|
throw new IOException(StreamFile.CONTENT_LENGTH+" header is missing");
|
||||||
|
}
|
||||||
|
final long streamlength = Long.parseLong(cl);
|
||||||
|
filelength = startPos + streamlength;
|
||||||
|
// Java has a bug with >2GB request streams. It won't bounds check
|
||||||
|
// the reads so the transfer blocks until the server times out
|
||||||
|
InputStream is =
|
||||||
|
new BoundedInputStream(connection.getInputStream(), streamlength);
|
||||||
|
|
||||||
|
resolvedURL.setURL(getResolvedUrl(connection));
|
||||||
|
|
||||||
|
return is;
|
||||||
|
}
|
||||||
|
|
||||||
private int update(final int n) throws IOException {
|
private int update(final int n) throws IOException {
|
||||||
if (n != -1) {
|
if (n != -1) {
|
||||||
currentPos += n;
|
currentPos += n;
|
||||||
@ -150,17 +163,21 @@ public int read(byte b[], int off, int len) throws IOException {
|
|||||||
* The next read() will be from that location. Can't
|
* The next read() will be from that location. Can't
|
||||||
* seek past the end of the file.
|
* seek past the end of the file.
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public void seek(long pos) throws IOException {
|
public void seek(long pos) throws IOException {
|
||||||
if (pos != currentPos) {
|
if (pos != currentPos) {
|
||||||
startPos = pos;
|
startPos = pos;
|
||||||
currentPos = pos;
|
currentPos = pos;
|
||||||
status = StreamStatus.SEEK;
|
if (status != StreamStatus.CLOSED) {
|
||||||
|
status = StreamStatus.SEEK;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return the current offset from the start of the file
|
* Return the current offset from the start of the file
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public long getPos() throws IOException {
|
public long getPos() throws IOException {
|
||||||
return currentPos;
|
return currentPos;
|
||||||
}
|
}
|
||||||
@ -169,7 +186,17 @@ public long getPos() throws IOException {
|
|||||||
* Seeks a different copy of the data. Returns true if
|
* Seeks a different copy of the data. Returns true if
|
||||||
* found a new source, false otherwise.
|
* found a new source, false otherwise.
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public boolean seekToNewSource(long targetPos) throws IOException {
|
public boolean seekToNewSource(long targetPos) throws IOException {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException {
|
||||||
|
if (in != null) {
|
||||||
|
in.close();
|
||||||
|
in = null;
|
||||||
|
}
|
||||||
|
status = StreamStatus.CLOSED;
|
||||||
|
}
|
||||||
}
|
}
|
@ -19,8 +19,10 @@
|
|||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertNull;
|
import static org.junit.Assert.assertNull;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
import static org.mockito.Mockito.doReturn;
|
import static org.mockito.Mockito.doReturn;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.spy;
|
import static org.mockito.Mockito.spy;
|
||||||
import static org.mockito.Mockito.times;
|
import static org.mockito.Mockito.times;
|
||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
@ -169,4 +171,74 @@ public void testByteRange() throws IOException {
|
|||||||
"HTTP_OK expected, received 206", e.getMessage());
|
"HTTP_OK expected, received 206", e.getMessage());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPropagatedClose() throws IOException {
|
||||||
|
ByteRangeInputStream brs = spy(
|
||||||
|
new HftpFileSystem.RangeHeaderInputStream(new URL("http://test/")));
|
||||||
|
|
||||||
|
InputStream mockStream = mock(InputStream.class);
|
||||||
|
doReturn(mockStream).when(brs).openInputStream();
|
||||||
|
|
||||||
|
int brisOpens = 0;
|
||||||
|
int brisCloses = 0;
|
||||||
|
int isCloses = 0;
|
||||||
|
|
||||||
|
// first open, shouldn't close underlying stream
|
||||||
|
brs.getInputStream();
|
||||||
|
verify(brs, times(++brisOpens)).openInputStream();
|
||||||
|
verify(brs, times(brisCloses)).close();
|
||||||
|
verify(mockStream, times(isCloses)).close();
|
||||||
|
|
||||||
|
// stream is open, shouldn't close underlying stream
|
||||||
|
brs.getInputStream();
|
||||||
|
verify(brs, times(brisOpens)).openInputStream();
|
||||||
|
verify(brs, times(brisCloses)).close();
|
||||||
|
verify(mockStream, times(isCloses)).close();
|
||||||
|
|
||||||
|
// seek forces a reopen, should close underlying stream
|
||||||
|
brs.seek(1);
|
||||||
|
brs.getInputStream();
|
||||||
|
verify(brs, times(++brisOpens)).openInputStream();
|
||||||
|
verify(brs, times(brisCloses)).close();
|
||||||
|
verify(mockStream, times(++isCloses)).close();
|
||||||
|
|
||||||
|
// verify that the underlying stream isn't closed after a seek
|
||||||
|
// ie. the state was correctly updated
|
||||||
|
brs.getInputStream();
|
||||||
|
verify(brs, times(brisOpens)).openInputStream();
|
||||||
|
verify(brs, times(brisCloses)).close();
|
||||||
|
verify(mockStream, times(isCloses)).close();
|
||||||
|
|
||||||
|
// seeking to same location should be a no-op
|
||||||
|
brs.seek(1);
|
||||||
|
brs.getInputStream();
|
||||||
|
verify(brs, times(brisOpens)).openInputStream();
|
||||||
|
verify(brs, times(brisCloses)).close();
|
||||||
|
verify(mockStream, times(isCloses)).close();
|
||||||
|
|
||||||
|
// close should of course close
|
||||||
|
brs.close();
|
||||||
|
verify(brs, times(++brisCloses)).close();
|
||||||
|
verify(mockStream, times(++isCloses)).close();
|
||||||
|
|
||||||
|
// it's already closed, underlying stream should not close
|
||||||
|
brs.close();
|
||||||
|
verify(brs, times(++brisCloses)).close();
|
||||||
|
verify(mockStream, times(isCloses)).close();
|
||||||
|
|
||||||
|
// it's closed, don't reopen it
|
||||||
|
boolean errored = false;
|
||||||
|
try {
|
||||||
|
brs.getInputStream();
|
||||||
|
} catch (IOException e) {
|
||||||
|
errored = true;
|
||||||
|
assertEquals("Stream closed", e.getMessage());
|
||||||
|
} finally {
|
||||||
|
assertTrue("Read a closed steam", errored);
|
||||||
|
}
|
||||||
|
verify(brs, times(brisOpens)).openInputStream();
|
||||||
|
verify(brs, times(brisCloses)).close();
|
||||||
|
verify(mockStream, times(isCloses)).close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -19,6 +19,7 @@
|
|||||||
package org.apache.hadoop.hdfs;
|
package org.apache.hadoop.hdfs;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
@ -234,6 +235,45 @@ public void testSeek() throws IOException {
|
|||||||
assertEquals('7', in.read());
|
assertEquals('7', in.read());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReadClosedStream() throws IOException {
|
||||||
|
final Path testFile = new Path("/testfile+2");
|
||||||
|
FSDataOutputStream os = hdfs.create(testFile, true);
|
||||||
|
os.writeBytes("0123456789");
|
||||||
|
os.close();
|
||||||
|
|
||||||
|
// ByteRangeInputStream delays opens until reads. Make sure it doesn't
|
||||||
|
// open a closed stream that has never been opened
|
||||||
|
FSDataInputStream in = hftpFs.open(testFile);
|
||||||
|
in.close();
|
||||||
|
checkClosedStream(in);
|
||||||
|
checkClosedStream(in.getWrappedStream());
|
||||||
|
|
||||||
|
// force the stream to connect and then close it
|
||||||
|
in = hftpFs.open(testFile);
|
||||||
|
int ch = in.read();
|
||||||
|
assertEquals('0', ch);
|
||||||
|
in.close();
|
||||||
|
checkClosedStream(in);
|
||||||
|
checkClosedStream(in.getWrappedStream());
|
||||||
|
|
||||||
|
// make sure seeking doesn't automagically reopen the stream
|
||||||
|
in.seek(4);
|
||||||
|
checkClosedStream(in);
|
||||||
|
checkClosedStream(in.getWrappedStream());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void checkClosedStream(InputStream is) {
|
||||||
|
IOException ioe = null;
|
||||||
|
try {
|
||||||
|
is.read();
|
||||||
|
} catch (IOException e) {
|
||||||
|
ioe = e;
|
||||||
|
}
|
||||||
|
assertNotNull("No exception on closed read", ioe);
|
||||||
|
assertEquals("Stream closed", ioe.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
public void resetFileSystem() throws IOException {
|
public void resetFileSystem() throws IOException {
|
||||||
// filesystem caching has a quirk/bug that it caches based on the user's
|
// filesystem caching has a quirk/bug that it caches based on the user's
|
||||||
// given uri. the result is if a filesystem is instantiated with no port,
|
// given uri. the result is if a filesystem is instantiated with no port,
|
||||||
|
Loading…
Reference in New Issue
Block a user