diff --git a/hadoop-mapreduce-project/CHANGES.MAPREDUCE-2841.txt b/hadoop-mapreduce-project/CHANGES.MAPREDUCE-2841.txt index cea5a7695d..e12f7439fc 100644 --- a/hadoop-mapreduce-project/CHANGES.MAPREDUCE-2841.txt +++ b/hadoop-mapreduce-project/CHANGES.MAPREDUCE-2841.txt @@ -4,3 +4,4 @@ Changes for Hadoop Native Map Output Collector MAPREDUCE-5985. native-task: Fix build on macosx. Contributed by Binglin Chang MAPREDUCE-5994. Simplify ByteUtils and fix failing test. (todd) MAPREDUCE-5996. native-task: Rename system tests into standard directory layout (todd) +MAPREDUCE-5997. native-task: Use DirectBufferPool from Hadoop Common (todd) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeBatchProcessor.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeBatchProcessor.java index fd68ea6c74..837da0ebcb 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeBatchProcessor.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeBatchProcessor.java @@ -24,12 +24,13 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.mapred.nativetask.buffer.BufferType; -import org.apache.hadoop.mapred.nativetask.buffer.DirectBufferPool; import org.apache.hadoop.mapred.nativetask.buffer.InputBuffer; import org.apache.hadoop.mapred.nativetask.buffer.OutputBuffer; import org.apache.hadoop.mapred.nativetask.util.ReadWriteBuffer; import org.apache.hadoop.mapred.nativetask.util.ConfigUtil; +import org.apache.hadoop.util.DirectBufferPool; /** * used to create channel, transfer data and command between Java and native @@ -126,9 +127,8 @@ public synchronized void close() throws IOException { NativeRuntime.releaseNativeObject(nativeHandlerAddr); nativeHandlerAddr = 0; } - if (null != in && null != in.getByteBuffer() && in.getByteBuffer().isDirect()) { - DirectBufferPool.getInstance().returnBuffer(in.getByteBuffer()); - } + IOUtils.cleanup(LOG, in); + in = null; } @Override diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/DirectBufferPool.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/DirectBufferPool.java deleted file mode 100644 index bd3c6bb553..0000000000 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/DirectBufferPool.java +++ /dev/null @@ -1,93 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.mapred.nativetask.buffer; - -import java.io.IOException; -import java.lang.ref.WeakReference; -import java.nio.ByteBuffer; -import java.util.Queue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ConcurrentMap; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - - -/** - * as direct buffer memory is not collected by GC, we keep a pool - * to reuse direct buffers - */ -public class DirectBufferPool { - - private static DirectBufferPool directBufferPool = null; - private static Log LOG = LogFactory.getLog(DirectBufferPool.class); - private ConcurrentMap>> bufferMap = new ConcurrentHashMap>>(); - - private DirectBufferPool() { - } - - public static synchronized DirectBufferPool getInstance() { - if (null == directBufferPool) { - directBufferPool = new DirectBufferPool(); - } - return directBufferPool; - } - - public static void destoryInstance(){ - directBufferPool = null; - } - - public synchronized ByteBuffer borrowBuffer(int capacity) throws IOException { - Queue> list = bufferMap.get(capacity); - if (null == list) { - return ByteBuffer.allocateDirect(capacity); - } - WeakReference ref; - while ((ref = list.poll()) != null) { - ByteBuffer buf = ref.get(); - if (buf != null) { - return buf; - } - } - return ByteBuffer.allocateDirect(capacity); - } - - public void returnBuffer(ByteBuffer buffer) throws IOException { - if (null == buffer || !buffer.isDirect()) { - throw new IOException("the buffer is null or the buffer returned is not direct buffer"); - } - - buffer.clear(); - int capacity = buffer.capacity(); - Queue> list = bufferMap.get(capacity); - if (null == list) { - list = new ConcurrentLinkedQueue>(); - Queue> prev = bufferMap.putIfAbsent(capacity, list); - if (prev != null) { - list = prev; - } - } - list.add(new WeakReference(buffer)); - } - - int getBufCountsForCapacity(int capacity) { - return bufferMap.get(capacity).size(); - } - -} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/InputBuffer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/InputBuffer.java index eb151643e3..82193b8d22 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/InputBuffer.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/InputBuffer.java @@ -18,11 +18,16 @@ package org.apache.hadoop.mapred.nativetask.buffer; +import org.apache.hadoop.util.DirectBufferPool; + +import java.io.Closeable; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.ByteOrder; -public class InputBuffer { +public class InputBuffer implements Closeable { + + static DirectBufferPool bufferPool = new DirectBufferPool(); private ByteBuffer byteBuffer; private final BufferType type; @@ -36,7 +41,7 @@ public InputBuffer(BufferType type, int inputSize) throws IOException { switch (type) { case DIRECT_BUFFER: - this.byteBuffer = DirectBufferPool.getInstance().borrowBuffer(capacity); + this.byteBuffer = bufferPool.getBuffer(capacity); this.byteBuffer.order(ByteOrder.BIG_ENDIAN); break; case HEAP_BUFFER: @@ -118,4 +123,12 @@ public byte[] array() { } return byteBuffer.array(); } + + @Override + public void close() { + if (byteBuffer != null && byteBuffer.isDirect()) { + bufferPool.returnBuffer(byteBuffer); + byteBuffer = null; + } + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestDirectBufferPool.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestDirectBufferPool.java deleted file mode 100644 index 09c1ef5c21..0000000000 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestDirectBufferPool.java +++ /dev/null @@ -1,201 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.mapred.nativetask.buffer; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotSame; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertSame; -import static org.junit.Assert.assertTrue; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.List; -import java.util.ArrayList; - -import org.junit.Test; - -public class TestDirectBufferPool { - - @Test - public void testGetInstance() throws Exception { - final int num = 100; - List pools = new ArrayList(); - Thread[] list = new Thread[num]; - for (int i = 0; i < num; i++) { - Thread t = getPoolThread(pools); - t.start(); - list[i] = t; - } - for (int i = 0; i < num; i++) { - try { - list[i].join(10000); - } catch (Exception e) { - e.printStackTrace(); - } - } - DirectBufferPool p1 = pools.get(0); - assertNotNull(p1); - for (int i = 1; i < pools.size(); i++) { - DirectBufferPool p2 = pools.get(i); - assertNotNull(p2); - assertSame(p1, p2); - } - } - - private Thread getPoolThread(final List pools) { - Thread t = new Thread() { - public void run() { - pools.add(DirectBufferPool.getInstance()); - } - }; - return t; - } - - - @Test - public void testBufBorrow() throws IOException { - final DirectBufferPool bufferPool = DirectBufferPool.getInstance(); - ByteBuffer b1 = bufferPool.borrowBuffer(100); - assertTrue(b1.isDirect()); - assertEquals(0, b1.position()); - assertEquals(100, b1.capacity()); - bufferPool.returnBuffer(b1); - ByteBuffer b2 = bufferPool.borrowBuffer(100); - assertTrue(b2.isDirect()); - assertEquals(0, b2.position()); - assertEquals(100, b2.capacity()); - assertSame(b1, b2); - - ByteBuffer b3 = bufferPool.borrowBuffer(100); - assertTrue(b3.isDirect()); - assertEquals(0, b3.position()); - assertEquals(100, b3.capacity()); - assertNotSame(b2, b3); - bufferPool.returnBuffer(b2); - bufferPool.returnBuffer(b3); - } - - @Test - public void testBufReset() throws IOException { - final DirectBufferPool bufferPool = DirectBufferPool.getInstance(); - ByteBuffer b1 = bufferPool.borrowBuffer(100); - assertTrue(b1.isDirect()); - assertEquals(0, b1.position()); - assertEquals(100, b1.capacity()); - b1.putInt(1); - assertEquals(4, b1.position()); - bufferPool.returnBuffer(b1); - ByteBuffer b2 = bufferPool.borrowBuffer(100); - assertSame(b1, b2); - assertTrue(b2.isDirect()); - assertEquals(0, b2.position()); - assertEquals(100, b2.capacity()); - } - - @Test - public void testBufReturn() throws IOException { - final DirectBufferPool bufferPool = DirectBufferPool.getInstance(); - int numOfBufs = 100; - int capacity = 100; - final ByteBuffer[] bufs = new ByteBuffer[numOfBufs]; - for (int i = 0; i < numOfBufs; i++) { - bufs[i] = bufferPool.borrowBuffer(capacity); - } - - assertEquals(0, bufferPool.getBufCountsForCapacity(capacity)); - - - int numOfThreads = numOfBufs; - Thread[] list = new Thread[numOfThreads]; - for (int i = 0; i < numOfThreads; i++) { - Thread t = retBufThread(bufferPool, bufs, i); - t.start(); - list[i] = t; - } - for (int i = 0; i < numOfThreads; i++) { - try { - list[i].join(10000); - } catch (Exception e) { - e.printStackTrace(); - } - } - - assertEquals(numOfBufs, bufferPool.getBufCountsForCapacity(capacity)); - } - - private Thread retBufThread(final DirectBufferPool bufferPool, final ByteBuffer[] bufs, final int i) { - Thread t = new Thread(new Runnable(){ - @Override - public void run() { - try { - bufferPool.returnBuffer(bufs[i]); - } catch (Exception e) { - e.printStackTrace(); - } - } - }); - return t; - } - - @Test - public void testBufException() { - final DirectBufferPool bufferPool = DirectBufferPool.getInstance(); - boolean thrown = false; - try { - bufferPool.returnBuffer(null); - } catch (IOException e) { - thrown = true; - } - assertEquals(true, thrown); - - thrown = false; - ByteBuffer buf = ByteBuffer.allocate(100); - try { - bufferPool.returnBuffer(buf); - } catch (IOException e) { - thrown = true; - } - assertEquals(true, thrown); - } - - @Test - public void testBufWeakRefClear() throws IOException { - final DirectBufferPool bufferPool = DirectBufferPool.getInstance(); - int numOfBufs = 100; - int capacity = 100; - ByteBuffer[] list = new ByteBuffer[capacity]; - for (int i = 0; i < numOfBufs; i++) { - list[i] = bufferPool.borrowBuffer(capacity); - } - for (int i = 0; i < numOfBufs; i++) { - bufferPool.returnBuffer(list[i]); - list[i] = null; - } - - assertEquals(numOfBufs, bufferPool.getBufCountsForCapacity(capacity)); - - for (int i = 0; i < 3; i++) { - System.gc(); - } - - ByteBuffer b = bufferPool.borrowBuffer(capacity); - assertEquals(0, bufferPool.getBufCountsForCapacity(capacity)); - } -}