diff --git a/CHANGES.txt b/CHANGES.txt index f5a660da13..e1e80ab828 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -162,6 +162,10 @@ Trunk (unreleased changes) HADOOP-6120. Add support for Avro specific and reflect data. (sharad via cutting) + HADOOP-6226. Moves BoundedByteArrayOutputStream from the tfile package to + the io package and makes it available to other users (MAPREDUCE-318). + (Jothi Padmanabhan via ddas) + IMPROVEMENTS HADOOP-4565. Added CombineFileInputFormat to use data locality information diff --git a/src/java/org/apache/hadoop/io/file/tfile/BoundedByteArrayOutputStream.java b/src/java/org/apache/hadoop/io/BoundedByteArrayOutputStream.java similarity index 77% rename from src/java/org/apache/hadoop/io/file/tfile/BoundedByteArrayOutputStream.java rename to src/java/org/apache/hadoop/io/BoundedByteArrayOutputStream.java index b0302f050a..1d8fa7358e 100644 --- a/src/java/org/apache/hadoop/io/file/tfile/BoundedByteArrayOutputStream.java +++ b/src/java/org/apache/hadoop/io/BoundedByteArrayOutputStream.java @@ -15,7 +15,7 @@ * the License. */ -package org.apache.hadoop.io.file.tfile; +package org.apache.hadoop.io; import java.io.EOFException; import java.io.IOException; @@ -26,15 +26,26 @@ * than the buffer capacity. The object can be reused through reset * API and choose different limits in each round. */ -class BoundedByteArrayOutputStream extends OutputStream { +public class BoundedByteArrayOutputStream extends OutputStream { private final byte[] buffer; private int limit; private int count; + /** + * Create a BoundedByteArrayOutputStream with the specified + * capacity + * @param capacity The capacity of the underlying byte array + */ public BoundedByteArrayOutputStream(int capacity) { this(capacity, capacity); } + /** + * Create a BoundedByteArrayOutputStream with the specified + * capacity and limit. + * @param capacity The capacity of the underlying byte array + * @param limit The maximum limit upto which data can be written + */ public BoundedByteArrayOutputStream(int capacity, int limit) { if ((capacity < limit) || (capacity | limit) < 0) { throw new IllegalArgumentException("Invalid capacity/limit"); @@ -69,6 +80,10 @@ public void write(byte b[], int off, int len) throws IOException { count += len; } + /** + * Reset the limit + * @param newlim New Limit + */ public void reset(int newlim) { if (newlim > buffer.length) { throw new IndexOutOfBoundsException("Limit exceeds buffer size"); @@ -77,19 +92,27 @@ public void reset(int newlim) { this.count = 0; } + /** Reset the buffer */ public void reset() { this.limit = buffer.length; this.count = 0; } + /** Return the current limit */ public int getLimit() { return limit; } + /** Returns the underlying buffer. + * Data is only valid to {@link #size()}. + */ public byte[] getBuffer() { return buffer; } + /** Returns the length of the valid data + * currently in the buffer. + */ public int size() { return count; } diff --git a/src/java/org/apache/hadoop/io/file/tfile/TFile.java b/src/java/org/apache/hadoop/io/file/tfile/TFile.java index 8b79e99dd0..8ef1ecfc0a 100644 --- a/src/java/org/apache/hadoop/io/file/tfile/TFile.java +++ b/src/java/org/apache/hadoop/io/file/tfile/TFile.java @@ -34,6 +34,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.io.BoundedByteArrayOutputStream; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.DataOutputBuffer; diff --git a/src/test/core/org/apache/hadoop/io/TestBoundedByteArrayOutputStream.java b/src/test/core/org/apache/hadoop/io/TestBoundedByteArrayOutputStream.java new file mode 100644 index 0000000000..a00d38bfa6 --- /dev/null +++ b/src/test/core/org/apache/hadoop/io/TestBoundedByteArrayOutputStream.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.io; + +import java.io.IOException; + +import junit.framework.TestCase; +import java.util.Arrays; +import java.util.Random; + + +/** Unit tests for BoundedByteArrayOutputStream */ +public class TestBoundedByteArrayOutputStream extends TestCase { + + private static final int SIZE = 1024; + private static final byte[] INPUT = new byte[SIZE]; + static { + new Random().nextBytes(INPUT); + } + + public void testBoundedStream() throws IOException { + + BoundedByteArrayOutputStream stream = + new BoundedByteArrayOutputStream(SIZE); + + // Write to the stream, get the data back and check for contents + stream.write(INPUT, 0, SIZE); + assertTrue("Array Contents Mismatch", + Arrays.equals(INPUT, stream.getBuffer())); + + // Try writing beyond end of buffer. Should throw an exception + boolean caughtException = false; + + try { + stream.write(INPUT[0]); + } catch (Exception e) { + caughtException = true; + } + + assertTrue("Writing beyond limit did not throw an exception", + caughtException); + + //Reset the stream and try, should succeed + stream.reset(); + assertTrue("Limit did not get reset correctly", + (stream.getLimit() == SIZE)); + stream.write(INPUT, 0, SIZE); + assertTrue("Array Contents Mismatch", + Arrays.equals(INPUT, stream.getBuffer())); + + // Try writing one more byte, should fail + caughtException = false; + try { + stream.write(INPUT[0]); + } catch (Exception e) { + caughtException = true; + } + + // Reset the stream, but set a lower limit. Writing beyond + // the limit should throw an exception + stream.reset(SIZE - 1); + assertTrue("Limit did not get reset correctly", + (stream.getLimit() == SIZE -1)); + caughtException = false; + + try { + stream.write(INPUT, 0, SIZE); + } catch (Exception e) { + caughtException = true; + } + + assertTrue("Writing beyond limit did not throw an exception", + caughtException); + } +}