MAPREDUCE-5997. native-task: Use DirectBufferPool from Hadoop Common. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/MR-2841@1613034 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2014-07-24 08:20:25 +00:00
parent 5149a8a6f1
commit 77acc70df5
5 changed files with 20 additions and 300 deletions

View File

@ -4,3 +4,4 @@ Changes for Hadoop Native Map Output Collector
MAPREDUCE-5985. native-task: Fix build on macosx. Contributed by Binglin Chang
MAPREDUCE-5994. Simplify ByteUtils and fix failing test. (todd)
MAPREDUCE-5996. native-task: Rename system tests into standard directory layout (todd)
MAPREDUCE-5997. native-task: Use DirectBufferPool from Hadoop Common (todd)

View File

@ -24,12 +24,13 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.mapred.nativetask.buffer.BufferType;
import org.apache.hadoop.mapred.nativetask.buffer.DirectBufferPool;
import org.apache.hadoop.mapred.nativetask.buffer.InputBuffer;
import org.apache.hadoop.mapred.nativetask.buffer.OutputBuffer;
import org.apache.hadoop.mapred.nativetask.util.ReadWriteBuffer;
import org.apache.hadoop.mapred.nativetask.util.ConfigUtil;
import org.apache.hadoop.util.DirectBufferPool;
/**
* used to create channel, transfer data and command between Java and native
@ -126,9 +127,8 @@ public synchronized void close() throws IOException {
NativeRuntime.releaseNativeObject(nativeHandlerAddr);
nativeHandlerAddr = 0;
}
if (null != in && null != in.getByteBuffer() && in.getByteBuffer().isDirect()) {
DirectBufferPool.getInstance().returnBuffer(in.getByteBuffer());
}
IOUtils.cleanup(LOG, in);
in = null;
}
@Override

View File

@ -1,93 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred.nativetask.buffer;
import java.io.IOException;
import java.lang.ref.WeakReference;
import java.nio.ByteBuffer;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* as direct buffer memory is not collected by GC, we keep a pool
* to reuse direct buffers
*/
public class DirectBufferPool {
private static DirectBufferPool directBufferPool = null;
private static Log LOG = LogFactory.getLog(DirectBufferPool.class);
private ConcurrentMap<Integer, Queue<WeakReference<ByteBuffer>>> bufferMap = new ConcurrentHashMap<Integer, Queue<WeakReference<ByteBuffer>>>();
private DirectBufferPool() {
}
public static synchronized DirectBufferPool getInstance() {
if (null == directBufferPool) {
directBufferPool = new DirectBufferPool();
}
return directBufferPool;
}
public static void destoryInstance(){
directBufferPool = null;
}
public synchronized ByteBuffer borrowBuffer(int capacity) throws IOException {
Queue<WeakReference<ByteBuffer>> list = bufferMap.get(capacity);
if (null == list) {
return ByteBuffer.allocateDirect(capacity);
}
WeakReference<ByteBuffer> ref;
while ((ref = list.poll()) != null) {
ByteBuffer buf = ref.get();
if (buf != null) {
return buf;
}
}
return ByteBuffer.allocateDirect(capacity);
}
public void returnBuffer(ByteBuffer buffer) throws IOException {
if (null == buffer || !buffer.isDirect()) {
throw new IOException("the buffer is null or the buffer returned is not direct buffer");
}
buffer.clear();
int capacity = buffer.capacity();
Queue<WeakReference<ByteBuffer>> list = bufferMap.get(capacity);
if (null == list) {
list = new ConcurrentLinkedQueue<WeakReference<ByteBuffer>>();
Queue<WeakReference<ByteBuffer>> prev = bufferMap.putIfAbsent(capacity, list);
if (prev != null) {
list = prev;
}
}
list.add(new WeakReference<ByteBuffer>(buffer));
}
int getBufCountsForCapacity(int capacity) {
return bufferMap.get(capacity).size();
}
}

View File

@ -18,11 +18,16 @@
package org.apache.hadoop.mapred.nativetask.buffer;
import org.apache.hadoop.util.DirectBufferPool;
import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
public class InputBuffer {
public class InputBuffer implements Closeable {
static DirectBufferPool bufferPool = new DirectBufferPool();
private ByteBuffer byteBuffer;
private final BufferType type;
@ -36,7 +41,7 @@ public InputBuffer(BufferType type, int inputSize) throws IOException {
switch (type) {
case DIRECT_BUFFER:
this.byteBuffer = DirectBufferPool.getInstance().borrowBuffer(capacity);
this.byteBuffer = bufferPool.getBuffer(capacity);
this.byteBuffer.order(ByteOrder.BIG_ENDIAN);
break;
case HEAP_BUFFER:
@ -118,4 +123,12 @@ public byte[] array() {
}
return byteBuffer.array();
}
@Override
public void close() {
if (byteBuffer != null && byteBuffer.isDirect()) {
bufferPool.returnBuffer(byteBuffer);
byteBuffer = null;
}
}
}

View File

@ -1,201 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred.nativetask.buffer;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.ArrayList;
import org.junit.Test;
public class TestDirectBufferPool {
@Test
public void testGetInstance() throws Exception {
final int num = 100;
List<DirectBufferPool> pools = new ArrayList<DirectBufferPool>();
Thread[] list = new Thread[num];
for (int i = 0; i < num; i++) {
Thread t = getPoolThread(pools);
t.start();
list[i] = t;
}
for (int i = 0; i < num; i++) {
try {
list[i].join(10000);
} catch (Exception e) {
e.printStackTrace();
}
}
DirectBufferPool p1 = pools.get(0);
assertNotNull(p1);
for (int i = 1; i < pools.size(); i++) {
DirectBufferPool p2 = pools.get(i);
assertNotNull(p2);
assertSame(p1, p2);
}
}
private Thread getPoolThread(final List<DirectBufferPool> pools) {
Thread t = new Thread() {
public void run() {
pools.add(DirectBufferPool.getInstance());
}
};
return t;
}
@Test
public void testBufBorrow() throws IOException {
final DirectBufferPool bufferPool = DirectBufferPool.getInstance();
ByteBuffer b1 = bufferPool.borrowBuffer(100);
assertTrue(b1.isDirect());
assertEquals(0, b1.position());
assertEquals(100, b1.capacity());
bufferPool.returnBuffer(b1);
ByteBuffer b2 = bufferPool.borrowBuffer(100);
assertTrue(b2.isDirect());
assertEquals(0, b2.position());
assertEquals(100, b2.capacity());
assertSame(b1, b2);
ByteBuffer b3 = bufferPool.borrowBuffer(100);
assertTrue(b3.isDirect());
assertEquals(0, b3.position());
assertEquals(100, b3.capacity());
assertNotSame(b2, b3);
bufferPool.returnBuffer(b2);
bufferPool.returnBuffer(b3);
}
@Test
public void testBufReset() throws IOException {
final DirectBufferPool bufferPool = DirectBufferPool.getInstance();
ByteBuffer b1 = bufferPool.borrowBuffer(100);
assertTrue(b1.isDirect());
assertEquals(0, b1.position());
assertEquals(100, b1.capacity());
b1.putInt(1);
assertEquals(4, b1.position());
bufferPool.returnBuffer(b1);
ByteBuffer b2 = bufferPool.borrowBuffer(100);
assertSame(b1, b2);
assertTrue(b2.isDirect());
assertEquals(0, b2.position());
assertEquals(100, b2.capacity());
}
@Test
public void testBufReturn() throws IOException {
final DirectBufferPool bufferPool = DirectBufferPool.getInstance();
int numOfBufs = 100;
int capacity = 100;
final ByteBuffer[] bufs = new ByteBuffer[numOfBufs];
for (int i = 0; i < numOfBufs; i++) {
bufs[i] = bufferPool.borrowBuffer(capacity);
}
assertEquals(0, bufferPool.getBufCountsForCapacity(capacity));
int numOfThreads = numOfBufs;
Thread[] list = new Thread[numOfThreads];
for (int i = 0; i < numOfThreads; i++) {
Thread t = retBufThread(bufferPool, bufs, i);
t.start();
list[i] = t;
}
for (int i = 0; i < numOfThreads; i++) {
try {
list[i].join(10000);
} catch (Exception e) {
e.printStackTrace();
}
}
assertEquals(numOfBufs, bufferPool.getBufCountsForCapacity(capacity));
}
private Thread retBufThread(final DirectBufferPool bufferPool, final ByteBuffer[] bufs, final int i) {
Thread t = new Thread(new Runnable(){
@Override
public void run() {
try {
bufferPool.returnBuffer(bufs[i]);
} catch (Exception e) {
e.printStackTrace();
}
}
});
return t;
}
@Test
public void testBufException() {
final DirectBufferPool bufferPool = DirectBufferPool.getInstance();
boolean thrown = false;
try {
bufferPool.returnBuffer(null);
} catch (IOException e) {
thrown = true;
}
assertEquals(true, thrown);
thrown = false;
ByteBuffer buf = ByteBuffer.allocate(100);
try {
bufferPool.returnBuffer(buf);
} catch (IOException e) {
thrown = true;
}
assertEquals(true, thrown);
}
@Test
public void testBufWeakRefClear() throws IOException {
final DirectBufferPool bufferPool = DirectBufferPool.getInstance();
int numOfBufs = 100;
int capacity = 100;
ByteBuffer[] list = new ByteBuffer[capacity];
for (int i = 0; i < numOfBufs; i++) {
list[i] = bufferPool.borrowBuffer(capacity);
}
for (int i = 0; i < numOfBufs; i++) {
bufferPool.returnBuffer(list[i]);
list[i] = null;
}
assertEquals(numOfBufs, bufferPool.getBufCountsForCapacity(capacity));
for (int i = 0; i < 3; i++) {
System.gc();
}
ByteBuffer b = bufferPool.borrowBuffer(capacity);
assertEquals(0, bufferPool.getBufCountsForCapacity(capacity));
}
}