HADOOP-15146. Remove DataOutputByteBuffer.

Contributed by BELUGA BEHR.
This commit is contained in:
Steve Loughran 2018-04-02 15:52:40 +01:00
parent a0bde7d525
commit dde1579096
3 changed files with 74 additions and 280 deletions

View File

@ -1,137 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io;
import java.io.DataOutputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.ArrayList;
import java.util.LinkedList;
public class DataOutputByteBuffer extends DataOutputStream {
static class Buffer extends OutputStream {
final byte[] b = new byte[1];
final boolean direct;
final List<ByteBuffer> active = new ArrayList<ByteBuffer>();
final List<ByteBuffer> inactive = new LinkedList<ByteBuffer>();
int size;
int length;
ByteBuffer current;
Buffer(int size, boolean direct) {
this.direct = direct;
this.size = size;
current = direct
? ByteBuffer.allocateDirect(size)
: ByteBuffer.allocate(size);
}
@Override
public void write(int b) {
this.b[0] = (byte)(b & 0xFF);
write(this.b);
}
@Override
public void write(byte[] b) {
write(b, 0, b.length);
}
@Override
public void write(byte[] b, int off, int len) {
int rem = current.remaining();
while (len > rem) {
current.put(b, off, rem);
length += rem;
current.flip();
active.add(current);
off += rem;
len -= rem;
rem = getBuffer(len);
}
current.put(b, off, len);
length += len;
}
int getBuffer(int newsize) {
if (inactive.isEmpty()) {
size = Math.max(size << 1, newsize);
current = direct
? ByteBuffer.allocateDirect(size)
: ByteBuffer.allocate(size);
} else {
current = inactive.remove(0);
}
return current.remaining();
}
ByteBuffer[] getData() {
ByteBuffer[] ret = active.toArray(new ByteBuffer[active.size() + 1]);
ByteBuffer tmp = current.duplicate();
tmp.flip();
ret[ret.length - 1] = tmp.slice();
return ret;
}
int getLength() {
return length;
}
void reset() {
length = 0;
current.rewind();
inactive.add(0, current);
for (int i = active.size() - 1; i >= 0; --i) {
ByteBuffer b = active.remove(i);
b.rewind();
inactive.add(0, b);
}
current = inactive.remove(0);
}
}
private final Buffer buffers;
public DataOutputByteBuffer() {
this(32);
}
public DataOutputByteBuffer(int size) {
this(size, false);
}
public DataOutputByteBuffer(int size, boolean direct) {
this(new Buffer(size, direct));
}
private DataOutputByteBuffer(Buffer buffers) {
super(buffers);
this.buffers = buffers;
}
public ByteBuffer[] getData() {
return buffers.getData();
}
public int getLength() {
return buffers.getLength();
}
public void reset() {
this.written = 0;
buffers.reset();
}
}

View File

@ -22,7 +22,6 @@
import java.io.DataOutput; import java.io.DataOutput;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Random; import java.util.Random;
import org.junit.Test; import org.junit.Test;
@ -30,53 +29,67 @@
public class TestDataByteBuffers { public class TestDataByteBuffers {
private static void readJunk(DataInput in, Random r, long seed, int iter) private static final Random RAND = new Random(31L);
private static void readJunk(DataInput in, int iter)
throws IOException { throws IOException {
r.setSeed(seed); RAND.setSeed(31L);
for (int i = 0; i < iter; ++i) { for (int i = 0; i < iter; ++i) {
switch (r.nextInt(7)) { switch (RAND.nextInt(7)) {
case 0: case 0:
assertEquals((byte)(r.nextInt() & 0xFF), in.readByte()); break; assertEquals((byte)(RAND.nextInt() & 0xFF), in.readByte()); break;
case 1: case 1:
assertEquals((short)(r.nextInt() & 0xFFFF), in.readShort()); break; assertEquals((short)(RAND.nextInt() & 0xFFFF), in.readShort()); break;
case 2: case 2:
assertEquals(r.nextInt(), in.readInt()); break; assertEquals(RAND.nextInt(), in.readInt()); break;
case 3: case 3:
assertEquals(r.nextLong(), in.readLong()); break; assertEquals(RAND.nextLong(), in.readLong()); break;
case 4: case 4:
assertEquals(Double.doubleToLongBits(r.nextDouble()), assertEquals(Double.doubleToLongBits(RAND.nextDouble()),
Double.doubleToLongBits(in.readDouble())); break; Double.doubleToLongBits(in.readDouble()));
case 5: break;
assertEquals(Float.floatToIntBits(r.nextFloat()), case 5:
Float.floatToIntBits(in.readFloat())); break; assertEquals(Float.floatToIntBits(RAND.nextFloat()),
case 6: Float.floatToIntBits(in.readFloat()));
int len = r.nextInt(1024); break;
byte[] vb = new byte[len]; case 6:
r.nextBytes(vb); int len = RAND.nextInt(1024);
byte[] b = new byte[len]; byte[] vb = new byte[len];
in.readFully(b, 0, len); RAND.nextBytes(vb);
assertArrayEquals(vb, b); byte[] b = new byte[len];
break; in.readFully(b, 0, len);
assertArrayEquals(vb, b);
break;
default:
throw new IOException();
} }
} }
} }
private static void writeJunk(DataOutput out, Random r, long seed, int iter) private static void writeJunk(DataOutput out, int iter)
throws IOException { throws IOException {
r.setSeed(seed); RAND.setSeed(31L);
for (int i = 0; i < iter; ++i) { for (int i = 0; i < iter; ++i) {
switch (r.nextInt(7)) { switch (RAND.nextInt(7)) {
case 0: out.writeByte(r.nextInt()); break; case 0:
case 1: out.writeShort((short)(r.nextInt() & 0xFFFF)); break; out.writeByte(RAND.nextInt()); break;
case 2: out.writeInt(r.nextInt()); break; case 1:
case 3: out.writeLong(r.nextLong()); break; out.writeShort((short)(RAND.nextInt() & 0xFFFF)); break;
case 4: out.writeDouble(r.nextDouble()); break; case 2:
case 5: out.writeFloat(r.nextFloat()); break; out.writeInt(RAND.nextInt()); break;
case 6: case 3:
byte[] b = new byte[r.nextInt(1024)]; out.writeLong(RAND.nextLong()); break;
r.nextBytes(b); case 4:
out.write(b); out.writeDouble(RAND.nextDouble()); break;
break; case 5:
out.writeFloat(RAND.nextFloat()); break;
case 6:
byte[] b = new byte[RAND.nextInt(1024)];
RAND.nextBytes(b);
out.write(b);
break;
default:
throw new IOException();
} }
} }
} }
@ -84,113 +97,26 @@ private static void writeJunk(DataOutput out, Random r, long seed, int iter)
@Test @Test
public void testBaseBuffers() throws IOException { public void testBaseBuffers() throws IOException {
DataOutputBuffer dob = new DataOutputBuffer(); DataOutputBuffer dob = new DataOutputBuffer();
Random r = new Random(); writeJunk(dob, 1000);
long seed = r.nextLong();
r.setSeed(seed);
System.out.println("SEED: " + seed);
writeJunk(dob, r, seed, 1000);
DataInputBuffer dib = new DataInputBuffer(); DataInputBuffer dib = new DataInputBuffer();
dib.reset(dob.getData(), 0, dob.getLength()); dib.reset(dob.getData(), 0, dob.getLength());
readJunk(dib, r, seed, 1000); readJunk(dib, 1000);
dob.reset(); dob.reset();
writeJunk(dob, r, seed, 1000); writeJunk(dob, 1000);
dib.reset(dob.getData(), 0, dob.getLength()); dib.reset(dob.getData(), 0, dob.getLength());
readJunk(dib, r, seed, 1000); readJunk(dib, 1000);
} }
@Test @Test
public void testByteBuffers() throws IOException { public void testDataInputByteBufferCompatibility() throws IOException {
DataOutputByteBuffer dob = new DataOutputByteBuffer();
Random r = new Random();
long seed = r.nextLong();
r.setSeed(seed);
System.out.println("SEED: " + seed);
writeJunk(dob, r, seed, 1000);
DataInputByteBuffer dib = new DataInputByteBuffer();
dib.reset(dob.getData());
readJunk(dib, r, seed, 1000);
dob.reset();
writeJunk(dob, r, seed, 1000);
dib.reset(dob.getData());
readJunk(dib, r, seed, 1000);
}
private static byte[] toBytes(ByteBuffer[] bufs, int len) {
byte[] ret = new byte[len];
int pos = 0;
for (int i = 0; i < bufs.length; ++i) {
int rem = bufs[i].remaining();
bufs[i].get(ret, pos, rem);
pos += rem;
}
return ret;
}
@Test
public void testDataOutputByteBufferCompatibility() throws IOException {
DataOutputBuffer dob = new DataOutputBuffer(); DataOutputBuffer dob = new DataOutputBuffer();
DataOutputByteBuffer dobb = new DataOutputByteBuffer(); writeJunk(dob, 1000);
Random r = new Random();
long seed = r.nextLong();
r.setSeed(seed);
System.out.println("SEED: " + seed);
writeJunk(dob, r, seed, 1000);
writeJunk(dobb, r, seed, 1000);
byte[] check = toBytes(dobb.getData(), dobb.getLength());
assertEquals(check.length, dob.getLength());
assertArrayEquals(check, Arrays.copyOf(dob.getData(), dob.getLength()));
dob.reset();
dobb.reset();
writeJunk(dob, r, seed, 3000);
writeJunk(dobb, r, seed, 3000);
check = toBytes(dobb.getData(), dobb.getLength());
assertEquals(check.length, dob.getLength());
assertArrayEquals(check, Arrays.copyOf(dob.getData(), dob.getLength()));
dob.reset();
dobb.reset();
writeJunk(dob, r, seed, 1000);
writeJunk(dobb, r, seed, 1000);
check = toBytes(dobb.getData(), dobb.getLength());
assertEquals("Failed Checking length = " + check.length,
check.length, dob.getLength());
assertArrayEquals(check, Arrays.copyOf(dob.getData(), dob.getLength()));
}
@Test
public void TestDataInputByteBufferCompatibility() throws IOException {
DataOutputBuffer dob = new DataOutputBuffer();
Random r = new Random();
long seed = r.nextLong();
r.setSeed(seed);
System.out.println("SEED: " + seed);
writeJunk(dob, r, seed, 1000);
ByteBuffer buf = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); ByteBuffer buf = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
DataInputByteBuffer dib = new DataInputByteBuffer(); DataInputByteBuffer dib = new DataInputByteBuffer();
dib.reset(buf); dib.reset(buf);
readJunk(dib, r, seed, 1000); readJunk(dib, 1000);
}
@Test
public void TestDataOutputByteBufferCompatibility() throws IOException {
DataOutputByteBuffer dob = new DataOutputByteBuffer();
Random r = new Random();
long seed = r.nextLong();
r.setSeed(seed);
System.out.println("SEED: " + seed);
writeJunk(dob, r, seed, 1000);
ByteBuffer buf = ByteBuffer.allocate(dob.getLength());
for (ByteBuffer b : dob.getData()) {
buf.put(b);
}
buf.flip();
DataInputBuffer dib = new DataInputBuffer();
dib.reset(buf.array(), 0, buf.remaining());
readJunk(dib, r, seed, 1000);
} }
} }

View File

@ -17,8 +17,11 @@
*/ */
package org.apache.hadoop.mapreduce; package org.apache.hadoop.mapreduce;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.nio.ByteBuffer;
import org.apache.hadoop.io.DataInputByteBuffer; import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.io.DataOutputByteBuffer;
import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.io.WritableUtils;
import org.junit.Test; import org.junit.Test;
import static org.junit.Assert.*; import static org.junit.Assert.*;
@ -73,7 +76,7 @@ public void testIsMap() {
* Test of getTaskType method, of class TaskID. * Test of getTaskType method, of class TaskID.
*/ */
@Test @Test
public void testGetTaskType_0args() { public void testGetTaskType0args() {
JobID jobId = new JobID("1234", 0); JobID jobId = new JobID("1234", 0);
for (TaskType type : TaskType.values()) { for (TaskType type : TaskType.values()) {
@ -253,17 +256,18 @@ public void testHashCode() {
*/ */
@Test @Test
public void testReadFields() throws Exception { public void testReadFields() throws Exception {
DataOutputByteBuffer out = new DataOutputByteBuffer(); ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream out = new DataOutputStream(baos);
out.writeInt(0); out.writeInt(0);
out.writeInt(1); out.writeInt(1);
WritableUtils.writeVInt(out, 4); WritableUtils.writeVInt(out, 4);
out.write(new byte[] { 0x31, 0x32, 0x33, 0x34}); out.write(new byte[] {0x31, 0x32, 0x33, 0x34});
WritableUtils.writeEnum(out, TaskType.REDUCE); WritableUtils.writeEnum(out, TaskType.REDUCE);
DataInputByteBuffer in = new DataInputByteBuffer(); DataInputByteBuffer in = new DataInputByteBuffer();
in.reset(out.getData()); in.reset(ByteBuffer.wrap(baos.toByteArray()));
TaskID instance = new TaskID(); TaskID instance = new TaskID();
@ -280,14 +284,15 @@ public void testReadFields() throws Exception {
public void testWrite() throws Exception { public void testWrite() throws Exception {
JobID jobId = new JobID("1234", 1); JobID jobId = new JobID("1234", 1);
TaskID taskId = new TaskID(jobId, TaskType.JOB_SETUP, 0); TaskID taskId = new TaskID(jobId, TaskType.JOB_SETUP, 0);
DataOutputByteBuffer out = new DataOutputByteBuffer(); ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream out = new DataOutputStream(baos);
taskId.write(out); taskId.write(out);
DataInputByteBuffer in = new DataInputByteBuffer(); DataInputByteBuffer in = new DataInputByteBuffer();
byte[] buffer = new byte[4]; byte[] buffer = new byte[4];
in.reset(out.getData()); in.reset(ByteBuffer.wrap(baos.toByteArray()));
assertEquals("The write() method did not write the expected task ID", assertEquals("The write() method did not write the expected task ID",
0, in.readInt()); 0, in.readInt());
@ -430,7 +435,7 @@ public void testGetRepresentingCharacter() {
* Test of getTaskType method, of class TaskID. * Test of getTaskType method, of class TaskID.
*/ */
@Test @Test
public void testGetTaskType_char() { public void testGetTaskTypeChar() {
assertEquals("The getTaskType() method did not return the expected type", assertEquals("The getTaskType() method did not return the expected type",
TaskType.MAP, TaskType.MAP,
TaskID.getTaskType('m')); TaskID.getTaskType('m'));