HADOOP-9276. Allow BoundedByteArrayOutputStream to be resettable. Contributed by Arun Murthy

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1442312 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hitesh Shah 2013-02-04 19:42:51 +00:00
parent 52e6f5a276
commit 114e23f7bd
3 changed files with 91 additions and 16 deletions

View File

@ -467,6 +467,9 @@ Release 2.0.3-alpha - Unreleased
HADOOP-9231. Parametrize staging URL for the uniformity of HADOOP-9231. Parametrize staging URL for the uniformity of
distributionManagement. (Konstantin Boudnik via suresh) distributionManagement. (Konstantin Boudnik via suresh)
HADOOP-9276. Allow BoundedByteArrayOutputStream to be resettable.
(Arun Murthy via hitesh)
OPTIMIZATIONS OPTIMIZATIONS
HADOOP-8866. SampleQuantiles#query is O(N^2) instead of O(N). (Andrew Wang HADOOP-8866. SampleQuantiles#query is O(N^2) instead of O(N). (Andrew Wang

View File

@ -32,9 +32,10 @@
@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"}) @InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
@InterfaceStability.Unstable @InterfaceStability.Unstable
public class BoundedByteArrayOutputStream extends OutputStream { public class BoundedByteArrayOutputStream extends OutputStream {
private final byte[] buffer; private byte[] buffer;
private int startOffset;
private int limit; private int limit;
private int count; private int currentPointer;
/** /**
* Create a BoundedByteArrayOutputStream with the specified * Create a BoundedByteArrayOutputStream with the specified
@ -52,20 +53,30 @@ public BoundedByteArrayOutputStream(int capacity) {
* @param limit The maximum limit upto which data can be written * @param limit The maximum limit upto which data can be written
*/ */
public BoundedByteArrayOutputStream(int capacity, int limit) { public BoundedByteArrayOutputStream(int capacity, int limit) {
this(new byte[capacity], 0, limit);
}
protected BoundedByteArrayOutputStream(byte[] buf, int offset, int limit) {
resetBuffer(buf, offset, limit);
}
protected void resetBuffer(byte[] buf, int offset, int limit) {
int capacity = buf.length - offset;
if ((capacity < limit) || (capacity | limit) < 0) { if ((capacity < limit) || (capacity | limit) < 0) {
throw new IllegalArgumentException("Invalid capacity/limit"); throw new IllegalArgumentException("Invalid capacity/limit");
} }
this.buffer = new byte[capacity]; this.buffer = buf;
this.limit = limit; this.startOffset = offset;
this.count = 0; this.currentPointer = offset;
this.limit = offset + limit;
} }
@Override @Override
public void write(int b) throws IOException { public void write(int b) throws IOException {
if (count >= limit) { if (currentPointer >= limit) {
throw new EOFException("Reaching the limit of the buffer."); throw new EOFException("Reaching the limit of the buffer.");
} }
buffer[count++] = (byte) b; buffer[currentPointer++] = (byte) b;
} }
@Override @Override
@ -77,12 +88,12 @@ public void write(byte b[], int off, int len) throws IOException {
return; return;
} }
if (count + len > limit) { if (currentPointer + len > limit) {
throw new EOFException("Reach the limit of the buffer"); throw new EOFException("Reach the limit of the buffer");
} }
System.arraycopy(b, off, buffer, count, len); System.arraycopy(b, off, buffer, currentPointer, len);
count += len; currentPointer += len;
} }
/** /**
@ -90,17 +101,17 @@ public void write(byte b[], int off, int len) throws IOException {
* @param newlim New Limit * @param newlim New Limit
*/ */
public void reset(int newlim) { public void reset(int newlim) {
if (newlim > buffer.length) { if (newlim > (buffer.length - startOffset)) {
throw new IndexOutOfBoundsException("Limit exceeds buffer size"); throw new IndexOutOfBoundsException("Limit exceeds buffer size");
} }
this.limit = newlim; this.limit = newlim;
this.count = 0; this.currentPointer = startOffset;
} }
/** Reset the buffer */ /** Reset the buffer */
public void reset() { public void reset() {
this.limit = buffer.length; this.limit = buffer.length - startOffset;
this.count = 0; this.currentPointer = startOffset;
} }
/** Return the current limit */ /** Return the current limit */
@ -119,6 +130,10 @@ public byte[] getBuffer() {
* currently in the buffer. * currently in the buffer.
*/ */
public int size() { public int size() {
return count; return currentPointer - startOffset;
}
public int available() {
return limit - currentPointer;
} }
} }

View File

@ -88,4 +88,61 @@ public void testBoundedStream() throws IOException {
assertTrue("Writing beyond limit did not throw an exception", assertTrue("Writing beyond limit did not throw an exception",
caughtException); caughtException);
} }
static class ResettableBoundedByteArrayOutputStream
extends BoundedByteArrayOutputStream {
public ResettableBoundedByteArrayOutputStream(int capacity) {
super(capacity);
}
public void resetBuffer(byte[] buf, int offset, int length) {
super.resetBuffer(buf, offset, length);
}
}
public void testResetBuffer() throws IOException {
ResettableBoundedByteArrayOutputStream stream =
new ResettableBoundedByteArrayOutputStream(SIZE);
// Write to the stream, get the data back and check for contents
stream.write(INPUT, 0, SIZE);
assertTrue("Array Contents Mismatch",
Arrays.equals(INPUT, stream.getBuffer()));
// Try writing beyond end of buffer. Should throw an exception
boolean caughtException = false;
try {
stream.write(INPUT[0]);
} catch (Exception e) {
caughtException = true;
}
assertTrue("Writing beyond limit did not throw an exception",
caughtException);
//Reset the stream and try, should succeed
byte[] newBuf = new byte[SIZE];
stream.resetBuffer(newBuf, 0, newBuf.length);
assertTrue("Limit did not get reset correctly",
(stream.getLimit() == SIZE));
stream.write(INPUT, 0, SIZE);
assertTrue("Array Contents Mismatch",
Arrays.equals(INPUT, stream.getBuffer()));
// Try writing one more byte, should fail
caughtException = false;
try {
stream.write(INPUT[0]);
} catch (Exception e) {
caughtException = true;
}
assertTrue("Writing beyond limit did not throw an exception",
caughtException);
}
} }