diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ByteBufferPool.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ByteBufferPool.java index aa5f8731c5..b30e7cfb9c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ByteBufferPool.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ByteBufferPool.java @@ -45,4 +45,9 @@ public interface ByteBufferPool { * @param buffer a direct bytebuffer */ void putBuffer(ByteBuffer buffer); + + /** + * Clear the buffer pool thus releasing all the buffers. + */ + default void release() { } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java index 6a162c3ff2..c4c2940622 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java @@ -36,8 +36,8 @@ */ @InterfaceAudience.Public @InterfaceStability.Stable -public final class ElasticByteBufferPool implements ByteBufferPool { - private static final class Key implements Comparable { +public class ElasticByteBufferPool implements ByteBufferPool { + protected static final class Key implements Comparable { private final int capacity; private final long insertionTime; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/WeakReferencedElasticByteBufferPool.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/WeakReferencedElasticByteBufferPool.java new file mode 100644 index 0000000000..c71c44e798 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/WeakReferencedElasticByteBufferPool.java @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.io; + +import java.lang.ref.WeakReference; +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.TreeMap; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.classification.VisibleForTesting; + +/** + * Buffer pool implementation which uses weak references to store + * buffers in the pool, such that they are garbage collected when + * there are no references to the buffer during a gc run. This is + * important as direct buffers don't get garbage collected automatically + * during a gc run as they are not stored on heap memory. + * Also the buffers are stored in a tree map which helps in returning + * smallest buffer whose size is just greater than requested length. + * This is a thread safe implementation. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public final class WeakReferencedElasticByteBufferPool extends ElasticByteBufferPool { + + /** + * Map to store direct byte buffers of different sizes in the pool. + * Used tree map such that we can return next greater than capacity + * buffer if buffer with exact capacity is unavailable. + * This must be accessed in synchronized blocks. + */ + private final TreeMap> directBuffers = + new TreeMap<>(); + + /** + * Map to store heap based byte buffers of different sizes in the pool. + * Used tree map such that we can return next greater than capacity + * buffer if buffer with exact capacity is unavailable. + * This must be accessed in synchronized blocks. + */ + private final TreeMap> heapBuffers = + new TreeMap<>(); + + /** + * Method to get desired buffer tree. + * @param isDirect whether the buffer is heap based or direct. + * @return corresponding buffer tree. + */ + private TreeMap> getBufferTree(boolean isDirect) { + return isDirect + ? directBuffers + : heapBuffers; + } + + /** + * {@inheritDoc} + * + * @param direct whether we want a direct byte buffer or a heap one. + * @param length length of requested buffer. + * @return returns equal or next greater than capacity buffer from + * pool if already available and not garbage collected else creates + * a new buffer and return it. + */ + @Override + public synchronized ByteBuffer getBuffer(boolean direct, int length) { + TreeMap> buffersTree = getBufferTree(direct); + + // Scan the entire tree and remove all weak null references. + buffersTree.entrySet().removeIf(next -> next.getValue().get() == null); + + Map.Entry> entry = + buffersTree.ceilingEntry(new Key(length, 0)); + // If there is no buffer present in the pool with desired size. + if (entry == null) { + return direct ? ByteBuffer.allocateDirect(length) : + ByteBuffer.allocate(length); + } + // buffer is available in the pool and not garbage collected. + WeakReference bufferInPool = entry.getValue(); + buffersTree.remove(entry.getKey()); + ByteBuffer buffer = bufferInPool.get(); + if (buffer != null) { + return buffer; + } + // buffer was in pool but already got garbage collected. + return direct + ? ByteBuffer.allocateDirect(length) + : ByteBuffer.allocate(length); + } + + /** + * Return buffer to the pool. + * @param buffer buffer to be returned. + */ + @Override + public synchronized void putBuffer(ByteBuffer buffer) { + buffer.clear(); + TreeMap> buffersTree = getBufferTree(buffer.isDirect()); + // Buffers are indexed by (capacity, time). + // If our key is not unique on the first try, we try again, since the + // time will be different. Since we use nanoseconds, it's pretty + // unlikely that we'll loop even once, unless the system clock has a + // poor granularity or multi-socket systems have clocks slightly out + // of sync. + while (true) { + Key keyToInsert = new Key(buffer.capacity(), System.nanoTime()); + if (!buffersTree.containsKey(keyToInsert)) { + buffersTree.put(keyToInsert, new WeakReference<>(buffer)); + return; + } + } + } + + /** + * Clear the buffer pool thus releasing all the buffers. + * The caller must remove all references of + * existing buffers before calling this method to avoid + * memory leaks. + */ + @Override + public synchronized void release() { + heapBuffers.clear(); + directBuffers.clear(); + } + + /** + * Get current buffers count in the pool. + * @param isDirect whether we want to count the heap or direct buffers. + * @return count of buffers. + */ + @VisibleForTesting + public synchronized int getCurrentBuffersCount(boolean isDirect) { + return isDirect + ? directBuffers.size() + : heapBuffers.size(); + } +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestMoreWeakReferencedElasticByteBufferPool.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestMoreWeakReferencedElasticByteBufferPool.java new file mode 100644 index 0000000000..6ca380ef0e --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestMoreWeakReferencedElasticByteBufferPool.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.io; + +import java.nio.BufferOverflowException; +import java.nio.ByteBuffer; + +import org.assertj.core.api.Assertions; +import org.junit.Test; + +import org.apache.hadoop.test.HadoopTestBase; + +import static org.apache.hadoop.test.LambdaTestUtils.intercept; + +/** + * Non parameterized tests for {@code WeakReferencedElasticByteBufferPool}. + */ +public class TestMoreWeakReferencedElasticByteBufferPool + extends HadoopTestBase { + + @Test + public void testMixedBuffersInPool() { + WeakReferencedElasticByteBufferPool pool = new WeakReferencedElasticByteBufferPool(); + ByteBuffer buffer1 = pool.getBuffer(true, 5); + ByteBuffer buffer2 = pool.getBuffer(true, 10); + ByteBuffer buffer3 = pool.getBuffer(false, 5); + ByteBuffer buffer4 = pool.getBuffer(false, 10); + ByteBuffer buffer5 = pool.getBuffer(true, 15); + + assertBufferCounts(pool, 0, 0); + pool.putBuffer(buffer1); + pool.putBuffer(buffer2); + assertBufferCounts(pool, 2, 0); + pool.putBuffer(buffer3); + assertBufferCounts(pool, 2, 1); + pool.putBuffer(buffer5); + assertBufferCounts(pool, 3, 1); + pool.putBuffer(buffer4); + assertBufferCounts(pool, 3, 2); + pool.release(); + assertBufferCounts(pool, 0, 0); + + } + + @Test + public void testUnexpectedBufferSizes() throws Exception { + WeakReferencedElasticByteBufferPool pool = new WeakReferencedElasticByteBufferPool(); + ByteBuffer buffer1 = pool.getBuffer(true, 0); + + // try writing a random byte in a 0 length buffer. + // Expected exception as buffer requested is of size 0. + intercept(BufferOverflowException.class, + () -> buffer1.put(new byte[1])); + + // Expected IllegalArgumentException as negative length buffer is requested. + intercept(IllegalArgumentException.class, + () -> pool.getBuffer(true, -5)); + + // test returning null buffer to the pool. + intercept(NullPointerException.class, + () -> pool.putBuffer(null)); + } + + /** + * Utility method to assert counts of direct and heap buffers in + * the given buffer pool. + * @param pool buffer pool. + * @param numDirectBuffersExpected expected number of direct buffers. + * @param numHeapBuffersExpected expected number of heap buffers. + */ + private void assertBufferCounts(WeakReferencedElasticByteBufferPool pool, + int numDirectBuffersExpected, + int numHeapBuffersExpected) { + Assertions.assertThat(pool.getCurrentBuffersCount(true)) + .describedAs("Number of direct buffers in pool") + .isEqualTo(numDirectBuffersExpected); + Assertions.assertThat(pool.getCurrentBuffersCount(false)) + .describedAs("Number of heap buffers in pool") + .isEqualTo(numHeapBuffersExpected); + } +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestWeakReferencedElasticByteBufferPool.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestWeakReferencedElasticByteBufferPool.java new file mode 100644 index 0000000000..1434010ffa --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestWeakReferencedElasticByteBufferPool.java @@ -0,0 +1,232 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.io; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.List; +import java.util.Random; + +import org.assertj.core.api.Assertions; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import org.apache.hadoop.test.HadoopTestBase; + +/** + * Unit tests for {@code WeakReferencedElasticByteBufferPool}. + */ +@RunWith(Parameterized.class) +public class TestWeakReferencedElasticByteBufferPool + extends HadoopTestBase { + + private final boolean isDirect; + + private final String type; + + @Parameterized.Parameters(name = "Buffer type : {0}") + public static List params() { + return Arrays.asList("direct", "array"); + } + + public TestWeakReferencedElasticByteBufferPool(String type) { + this.type = type; + this.isDirect = !"array".equals(type); + } + + @Test + public void testGetAndPutBasic() { + WeakReferencedElasticByteBufferPool pool = new WeakReferencedElasticByteBufferPool(); + int bufferSize = 5; + ByteBuffer buffer = pool.getBuffer(isDirect, bufferSize); + Assertions.assertThat(buffer.isDirect()) + .describedAs("Buffered returned should be of correct type {}", type) + .isEqualTo(isDirect); + Assertions.assertThat(buffer.capacity()) + .describedAs("Initial capacity of returned buffer from pool") + .isEqualTo(bufferSize); + Assertions.assertThat(buffer.position()) + .describedAs("Initial position of returned buffer from pool") + .isEqualTo(0); + + byte[] arr = createByteArray(bufferSize); + buffer.put(arr, 0, arr.length); + buffer.flip(); + validateBufferContent(buffer, arr); + Assertions.assertThat(buffer.position()) + .describedAs("Buffer's position after filling bytes in it") + .isEqualTo(bufferSize); + // releasing buffer to the pool. + pool.putBuffer(buffer); + Assertions.assertThat(buffer.position()) + .describedAs("Position should be reset to 0 after returning buffer to the pool") + .isEqualTo(0); + + } + + @Test + public void testPoolingWithDifferentSizes() { + WeakReferencedElasticByteBufferPool pool = new WeakReferencedElasticByteBufferPool(); + ByteBuffer buffer = pool.getBuffer(isDirect, 5); + ByteBuffer buffer1 = pool.getBuffer(isDirect, 10); + ByteBuffer buffer2 = pool.getBuffer(isDirect, 15); + + Assertions.assertThat(pool.getCurrentBuffersCount(isDirect)) + .describedAs("Number of buffers in the pool") + .isEqualTo(0); + + pool.putBuffer(buffer1); + pool.putBuffer(buffer2); + Assertions.assertThat(pool.getCurrentBuffersCount(isDirect)) + .describedAs("Number of buffers in the pool") + .isEqualTo(2); + ByteBuffer buffer3 = pool.getBuffer(isDirect, 12); + Assertions.assertThat(buffer3.capacity()) + .describedAs("Pooled buffer should have older capacity") + .isEqualTo(15); + Assertions.assertThat(pool.getCurrentBuffersCount(isDirect)) + .describedAs("Number of buffers in the pool") + .isEqualTo(1); + pool.putBuffer(buffer); + ByteBuffer buffer4 = pool.getBuffer(isDirect, 6); + Assertions.assertThat(buffer4.capacity()) + .describedAs("Pooled buffer should have older capacity") + .isEqualTo(10); + Assertions.assertThat(pool.getCurrentBuffersCount(isDirect)) + .describedAs("Number of buffers in the pool") + .isEqualTo(1); + + pool.release(); + Assertions.assertThat(pool.getCurrentBuffersCount(isDirect)) + .describedAs("Number of buffers in the pool post release") + .isEqualTo(0); + } + + @Test + public void testPoolingWithDifferentInsertionTime() { + WeakReferencedElasticByteBufferPool pool = new WeakReferencedElasticByteBufferPool(); + ByteBuffer buffer = pool.getBuffer(isDirect, 10); + ByteBuffer buffer1 = pool.getBuffer(isDirect, 10); + ByteBuffer buffer2 = pool.getBuffer(isDirect, 10); + + Assertions.assertThat(pool.getCurrentBuffersCount(isDirect)) + .describedAs("Number of buffers in the pool") + .isEqualTo(0); + + pool.putBuffer(buffer1); + pool.putBuffer(buffer2); + Assertions.assertThat(pool.getCurrentBuffersCount(isDirect)) + .describedAs("Number of buffers in the pool") + .isEqualTo(2); + ByteBuffer buffer3 = pool.getBuffer(isDirect, 10); + // As buffer1 is returned to the pool before buffer2, it should + // be returned when buffer of same size is asked again from + // the pool. Memory references must match not just content + // that is why {@code Assertions.isSameAs} is used here rather + // than usual {@code Assertions.isEqualTo}. + Assertions.assertThat(buffer3) + .describedAs("Buffers should be returned in order of their " + + "insertion time") + .isSameAs(buffer1); + pool.putBuffer(buffer); + ByteBuffer buffer4 = pool.getBuffer(isDirect, 10); + Assertions.assertThat(buffer4) + .describedAs("Buffers should be returned in order of their " + + "insertion time") + .isSameAs(buffer2); + } + + @Test + public void testGarbageCollection() { + WeakReferencedElasticByteBufferPool pool = new WeakReferencedElasticByteBufferPool(); + ByteBuffer buffer = pool.getBuffer(isDirect, 5); + ByteBuffer buffer1 = pool.getBuffer(isDirect, 10); + ByteBuffer buffer2 = pool.getBuffer(isDirect, 15); + Assertions.assertThat(pool.getCurrentBuffersCount(isDirect)) + .describedAs("Number of buffers in the pool") + .isEqualTo(0); + pool.putBuffer(buffer1); + pool.putBuffer(buffer2); + Assertions.assertThat(pool.getCurrentBuffersCount(isDirect)) + .describedAs("Number of buffers in the pool") + .isEqualTo(2); + // Before GC. + ByteBuffer buffer4 = pool.getBuffer(isDirect, 12); + Assertions.assertThat(buffer4.capacity()) + .describedAs("Pooled buffer should have older capacity") + .isEqualTo(15); + pool.putBuffer(buffer4); + // Removing the references + buffer1 = null; + buffer2 = null; + buffer4 = null; + System.gc(); + ByteBuffer buffer3 = pool.getBuffer(isDirect, 12); + Assertions.assertThat(buffer3.capacity()) + .describedAs("After garbage collection new buffer should be " + + "returned with fixed capacity") + .isEqualTo(12); + } + + @Test + public void testWeakReferencesPruning() { + WeakReferencedElasticByteBufferPool pool = new WeakReferencedElasticByteBufferPool(); + ByteBuffer buffer1 = pool.getBuffer(isDirect, 5); + ByteBuffer buffer2 = pool.getBuffer(isDirect, 10); + ByteBuffer buffer3 = pool.getBuffer(isDirect, 15); + + pool.putBuffer(buffer2); + pool.putBuffer(buffer3); + Assertions.assertThat(pool.getCurrentBuffersCount(isDirect)) + .describedAs("Number of buffers in the pool") + .isEqualTo(2); + + // marking only buffer2 to be garbage collected. + buffer2 = null; + System.gc(); + ByteBuffer buffer4 = pool.getBuffer(isDirect, 10); + // Number of buffers in the pool is 0 as one got garbage + // collected and other got returned in above call. + Assertions.assertThat(pool.getCurrentBuffersCount(isDirect)) + .describedAs("Number of buffers in the pool") + .isEqualTo(0); + Assertions.assertThat(buffer4.capacity()) + .describedAs("After gc, pool should return next greater than " + + "available buffer") + .isEqualTo(15); + + } + + private void validateBufferContent(ByteBuffer buffer, byte[] arr) { + for (int i=0; i