Added Data(In,Out)putByteBuffer to work with ByteBuffer similar to Data(In,Out)putBuffer for byte[]. Contributed by Chris Douglas.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1143606 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Matthew Foley 2011-07-06 23:14:33 +00:00
parent 952d970480
commit 96e62036bf
3 changed files with 378 additions and 0 deletions

View File

@ -0,0 +1,82 @@
package org.apache.hadoop.io;
import java.io.DataInputStream;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.List;
public class DataInputByteBuffer extends DataInputStream {
private static class Buffer extends InputStream {
private final byte[] scratch = new byte[1];
ByteBuffer[] buffers = new ByteBuffer[0];
int bidx, pos, length;
@Override
public int read() {
if (-1 == read(scratch, 0, 1)) {
return -1;
}
return scratch[0] & 0xFF;
}
@Override
public int read(byte[] b, int off, int len) {
if (bidx >= buffers.length) {
return -1;
}
int cur = 0;
do {
int rem = Math.min(len, buffers[bidx].remaining());
buffers[bidx].get(b, off, rem);
cur += rem;
off += rem;
len -= rem;
} while (len > 0 && ++bidx < buffers.length);
pos += cur;
return cur;
}
public void reset(ByteBuffer[] buffers) {
bidx = pos = length = 0;
this.buffers = buffers;
for (ByteBuffer b : buffers) {
length += b.remaining();
}
}
public int getPosition() {
return pos;
}
public int getLength() {
return length;
}
public ByteBuffer[] getData() {
return buffers;
}
}
private Buffer buffers;
public DataInputByteBuffer() {
this(new Buffer());
}
private DataInputByteBuffer(Buffer buffers) {
super(buffers);
this.buffers = buffers;
}
public void reset(ByteBuffer... input) {
buffers.reset(input);
}
public ByteBuffer[] getData() {
return buffers.getData();
}
public int getPosition() {
return buffers.getPosition();
}
public int getLength() {
return buffers.getLength();
}
}

View File

@ -0,0 +1,119 @@
package org.apache.hadoop.io;
import java.io.DataOutputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.ArrayList;
import java.util.LinkedList;
public class DataOutputByteBuffer extends DataOutputStream {
static class Buffer extends OutputStream {
final byte[] b = new byte[1];
final boolean direct;
final List<ByteBuffer> active = new ArrayList<ByteBuffer>();
final List<ByteBuffer> inactive = new LinkedList<ByteBuffer>();
int size;
int length;
ByteBuffer current;
Buffer(int size, boolean direct) {
this.direct = direct;
this.size = size;
current = direct
? ByteBuffer.allocateDirect(size)
: ByteBuffer.allocate(size);
}
@Override
public void write(int b) {
this.b[0] = (byte)(b & 0xFF);
write(this.b);
}
@Override
public void write(byte[] b) {
write(b, 0, b.length);
}
@Override
public void write(byte[] b, int off, int len) {
int rem = current.remaining();
while (len > rem) {
current.put(b, off, rem);
length += rem;
current.flip();
active.add(current);
off += rem;
len -= rem;
rem = getBuffer(len);
}
current.put(b, off, len);
length += len;
}
int getBuffer(int newsize) {
if (inactive.isEmpty()) {
size = Math.max(size << 1, newsize);
current = direct
? ByteBuffer.allocateDirect(size)
: ByteBuffer.allocate(size);
} else {
current = inactive.remove(0);
}
return current.remaining();
}
ByteBuffer[] getData() {
ByteBuffer[] ret = active.toArray(new ByteBuffer[active.size() + 1]);
ByteBuffer tmp = current.duplicate();
tmp.flip();
ret[ret.length - 1] = tmp.slice();
return ret;
}
int getLength() {
return length;
}
void reset() {
length = 0;
current.rewind();
inactive.add(0, current);
for (int i = active.size() - 1; i >= 0; --i) {
ByteBuffer b = active.remove(i);
b.rewind();
inactive.add(0, b);
}
current = inactive.remove(0);
}
}
private final Buffer buffers;
public DataOutputByteBuffer() {
this(32);
}
public DataOutputByteBuffer(int size) {
this(size, false);
}
public DataOutputByteBuffer(int size, boolean direct) {
this(new Buffer(size, direct));
}
private DataOutputByteBuffer(Buffer buffers) {
super(buffers);
this.buffers = buffers;
}
public ByteBuffer[] getData() {
return buffers.getData();
}
public int getLength() {
return buffers.getLength();
}
public void reset() {
this.written = 0;
buffers.reset();
}
}

View File

@ -0,0 +1,177 @@
package org.apache.hadoop.io;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Random;
import org.junit.Test;
import static org.junit.Assert.*;
public class TestDataByteBuffers {
private static void readJunk(DataInput in, Random r, long seed, int iter)
throws IOException {
r.setSeed(seed);
for (int i = 0; i < iter; ++i) {
switch (r.nextInt(7)) {
case 0:
assertEquals((byte)(r.nextInt() & 0xFF), in.readByte()); break;
case 1:
assertEquals((short)(r.nextInt() & 0xFFFF), in.readShort()); break;
case 2:
assertEquals(r.nextInt(), in.readInt()); break;
case 3:
assertEquals(r.nextLong(), in.readLong()); break;
case 4:
assertEquals(Double.doubleToLongBits(r.nextDouble()),
Double.doubleToLongBits(in.readDouble())); break;
case 5:
assertEquals(Float.floatToIntBits(r.nextFloat()),
Float.floatToIntBits(in.readFloat())); break;
case 6:
int len = r.nextInt(1024);
byte[] vb = new byte[len];
r.nextBytes(vb);
byte[] b = new byte[len];
in.readFully(b, 0, len);
assertArrayEquals(vb, b);
break;
}
}
}
private static void writeJunk(DataOutput out, Random r, long seed, int iter)
throws IOException {
r.setSeed(seed);
for (int i = 0; i < iter; ++i) {
switch (r.nextInt(7)) {
case 0: out.writeByte(r.nextInt()); break;
case 1: out.writeShort((short)(r.nextInt() & 0xFFFF)); break;
case 2: out.writeInt(r.nextInt()); break;
case 3: out.writeLong(r.nextLong()); break;
case 4: out.writeDouble(r.nextDouble()); break;
case 5: out.writeFloat(r.nextFloat()); break;
case 6:
byte[] b = new byte[r.nextInt(1024)];
r.nextBytes(b);
out.write(b);
break;
}
}
}
@Test
public void testBaseBuffers() throws IOException {
DataOutputBuffer dob = new DataOutputBuffer();
Random r = new Random();
long seed = r.nextLong();
r.setSeed(seed);
System.out.println("SEED: " + seed);
writeJunk(dob, r, seed, 1000);
DataInputBuffer dib = new DataInputBuffer();
dib.reset(dob.getData(), 0, dob.getLength());
readJunk(dib, r, seed, 1000);
dob.reset();
writeJunk(dob, r, seed, 1000);
dib.reset(dob.getData(), 0, dob.getLength());
readJunk(dib, r, seed, 1000);
}
@Test
public void testByteBuffers() throws IOException {
DataOutputByteBuffer dob = new DataOutputByteBuffer();
Random r = new Random();
long seed = r.nextLong();
r.setSeed(seed);
System.out.println("SEED: " + seed);
writeJunk(dob, r, seed, 1000);
DataInputByteBuffer dib = new DataInputByteBuffer();
dib.reset(dob.getData());
readJunk(dib, r, seed, 1000);
dob.reset();
writeJunk(dob, r, seed, 1000);
dib.reset(dob.getData());
readJunk(dib, r, seed, 1000);
}
private static byte[] toBytes(ByteBuffer[] bufs, int len) {
byte[] ret = new byte[len];
int pos = 0;
for (int i = 0; i < bufs.length; ++i) {
int rem = bufs[i].remaining();
bufs[i].get(ret, pos, rem);
pos += rem;
}
return ret;
}
@Test
public void testDataOutputByteBufferCompatibility() throws IOException {
DataOutputBuffer dob = new DataOutputBuffer();
DataOutputByteBuffer dobb = new DataOutputByteBuffer();
Random r = new Random();
long seed = r.nextLong();
r.setSeed(seed);
System.out.println("SEED: " + seed);
writeJunk(dob, r, seed, 1000);
writeJunk(dobb, r, seed, 1000);
byte[] check = toBytes(dobb.getData(), dobb.getLength());
assertEquals(dob.getLength(), check.length);
assertArrayEquals(Arrays.copyOf(dob.getData(), dob.getLength()), check);
dob.reset();
dobb.reset();
writeJunk(dob, r, seed, 3000);
writeJunk(dobb, r, seed, 3000);
check = toBytes(dobb.getData(), dobb.getLength());
assertEquals(dob.getLength(), check.length);
assertArrayEquals(Arrays.copyOf(dob.getData(), dob.getLength()), check);
dob.reset();
dobb.reset();
writeJunk(dob, r, seed, 1000);
writeJunk(dobb, r, seed, 1000);
check = toBytes(dobb.getData(), dobb.getLength());
assertEquals(dob.getLength(), check.length);
assertArrayEquals(Arrays.copyOf(dob.getData(), dob.getLength()), check);
}
@Test
public void TestDataInputByteBufferCompatibility() throws IOException {
DataOutputBuffer dob = new DataOutputBuffer();
Random r = new Random();
long seed = r.nextLong();
r.setSeed(seed);
System.out.println("SEED: " + seed);
writeJunk(dob, r, seed, 1000);
ByteBuffer buf = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
DataInputByteBuffer dib = new DataInputByteBuffer();
dib.reset(buf);
readJunk(dib, r, seed, 1000);
}
@Test
public void TestDataOutputByteBufferCompatibility() throws IOException {
DataOutputByteBuffer dob = new DataOutputByteBuffer();
Random r = new Random();
long seed = r.nextLong();
r.setSeed(seed);
System.out.println("SEED: " + seed);
writeJunk(dob, r, seed, 1000);
ByteBuffer buf = ByteBuffer.allocate(dob.getLength());
for (ByteBuffer b : dob.getData()) {
buf.put(b);
}
buf.flip();
DataInputBuffer dib = new DataInputBuffer();
dib.reset(buf.array(), 0, buf.remaining());
readJunk(dib, r, seed, 1000);
}
}