HDFS-8859. Improve DataNode ReplicaMap memory footprint to save about 45%. (yliu)

This commit is contained in:
yliu 2015-09-29 16:20:35 +08:00
parent 151fca5032
commit d6fa34e014
11 changed files with 569 additions and 64 deletions

View File

@ -17,6 +17,8 @@
*/ */
package org.apache.hadoop.util; package org.apache.hadoop.util;
import java.util.Collection;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
@ -86,5 +88,17 @@ public interface GSet<K, E extends K> extends Iterable<E> {
*/ */
E remove(K key); E remove(K key);
/**
* Clear the set.
*/
void clear(); void clear();
/**
* Returns a {@link Collection} view of the values contained in this set.
* The collection is backed by the set, so changes to the set are
* reflected in the collection, and vice-versa.
*
* @return the collection of values.
*/
Collection<E> values();
} }

View File

@ -17,6 +17,7 @@
*/ */
package org.apache.hadoop.util; package org.apache.hadoop.util;
import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
@ -70,4 +71,9 @@ public Iterator<E> iterator() {
public void clear() { public void clear() {
m.clear(); m.clear();
} }
@Override
public Collection<E> values() {
return m.values();
}
} }

View File

@ -18,12 +18,14 @@
package org.apache.hadoop.util; package org.apache.hadoop.util;
import java.io.PrintStream; import java.io.PrintStream;
import java.util.AbstractCollection;
import java.util.Arrays;
import java.util.Collection;
import java.util.ConcurrentModificationException; import java.util.ConcurrentModificationException;
import java.util.Iterator; import java.util.Iterator;
import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.util.StringUtils;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
@ -49,12 +51,12 @@ public class LightWeightGSet<K, E extends K> implements GSet<K, E> {
/** /**
* Elements of {@link LightWeightGSet}. * Elements of {@link LightWeightGSet}.
*/ */
public static interface LinkedElement { public interface LinkedElement {
/** Set the next element. */ /** Set the next element. */
public void setNext(LinkedElement next); void setNext(LinkedElement next);
/** Get the next element. */ /** Get the next element. */
public LinkedElement getNext(); LinkedElement getNext();
} }
static final int MAX_ARRAY_LENGTH = 1 << 30; //prevent int overflow problem static final int MAX_ARRAY_LENGTH = 1 << 30; //prevent int overflow problem
@ -64,15 +66,20 @@ public static interface LinkedElement {
* An internal array of entries, which are the rows of the hash table. * An internal array of entries, which are the rows of the hash table.
* The size must be a power of two. * The size must be a power of two.
*/ */
private final LinkedElement[] entries; protected LinkedElement[] entries;
/** A mask for computing the array index from the hash value of an element. */ /** A mask for computing the array index from the hash value of an element. */
private final int hash_mask; protected int hash_mask;
/** The size of the set (not the entry array). */ /** The size of the set (not the entry array). */
private int size = 0; protected int size = 0;
/** Modification version for fail-fast. /** Modification version for fail-fast.
* @see ConcurrentModificationException * @see ConcurrentModificationException
*/ */
private int modification = 0; protected int modification = 0;
private Collection<E> values;
protected LightWeightGSet() {
}
/** /**
* @param recommended_length Recommended size of the internal array. * @param recommended_length Recommended size of the internal array.
@ -87,7 +94,7 @@ public LightWeightGSet(final int recommended_length) {
} }
//compute actual length //compute actual length
private static int actualArrayLength(int recommended) { protected static int actualArrayLength(int recommended) {
if (recommended > MAX_ARRAY_LENGTH) { if (recommended > MAX_ARRAY_LENGTH) {
return MAX_ARRAY_LENGTH; return MAX_ARRAY_LENGTH;
} else if (recommended < MIN_ARRAY_LENGTH) { } else if (recommended < MIN_ARRAY_LENGTH) {
@ -103,11 +110,11 @@ public int size() {
return size; return size;
} }
private int getIndex(final K key) { protected int getIndex(final K key) {
return key.hashCode() & hash_mask; return key.hashCode() & hash_mask;
} }
private E convert(final LinkedElement e){ protected E convert(final LinkedElement e){
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
final E r = (E)e; final E r = (E)e;
return r; return r;
@ -138,24 +145,26 @@ public boolean contains(final K key) {
@Override @Override
public E put(final E element) { public E put(final E element) {
//validate element // validate element
if (element == null) { if (element == null) {
throw new NullPointerException("Null element is not supported."); throw new NullPointerException("Null element is not supported.");
} }
if (!(element instanceof LinkedElement)) { LinkedElement e = null;
try {
e = (LinkedElement)element;
} catch (ClassCastException ex) {
throw new HadoopIllegalArgumentException( throw new HadoopIllegalArgumentException(
"!(element instanceof LinkedElement), element.getClass()=" "!(element instanceof LinkedElement), element.getClass()="
+ element.getClass()); + element.getClass());
} }
final LinkedElement e = (LinkedElement)element;
//find index // find index
final int index = getIndex(element); final int index = getIndex(element);
//remove if it already exists // remove if it already exists
final E existing = remove(index, element); final E existing = remove(index, element);
//insert the element to the head of the linked list // insert the element to the head of the linked list
modification++; modification++;
size++; size++;
e.setNext(entries[index]); e.setNext(entries[index]);
@ -171,7 +180,7 @@ public E put(final E element) {
* @return If such element exists, return it. * @return If such element exists, return it.
* Otherwise, return null. * Otherwise, return null.
*/ */
private E remove(final int index, final K key) { protected E remove(final int index, final K key) {
if (entries[index] == null) { if (entries[index] == null) {
return null; return null;
} else if (entries[index].equals(key)) { } else if (entries[index].equals(key)) {
@ -213,6 +222,38 @@ public E remove(final K key) {
return remove(getIndex(key), key); return remove(getIndex(key), key);
} }
@Override
public Collection<E> values() {
if (values == null) {
values = new Values();
}
return values;
}
private final class Values extends AbstractCollection<E> {
@Override
public Iterator<E> iterator() {
return LightWeightGSet.this.iterator();
}
@Override
public int size() {
return size;
}
@SuppressWarnings("unchecked")
@Override
public boolean contains(Object o) {
return LightWeightGSet.this.contains((K)o);
}
@Override
public void clear() {
LightWeightGSet.this.clear();
}
}
@Override @Override
public Iterator<E> iterator() { public Iterator<E> iterator() {
return new SetIterator(); return new SetIterator();
@ -363,9 +404,8 @@ static int computeCapacity(long maxMemory, double percentage,
} }
public void clear() { public void clear() {
for (int i = 0; i < entries.length; i++) { modification++;
entries[i] = null; Arrays.fill(entries, null);
}
size = 0; size = 0;
} }
} }

View File

@ -0,0 +1,129 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.util;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
/**
* A low memory footprint {@link GSet} implementation,
* which uses an array for storing the elements
* and linked lists for collision resolution.
*
* If the size of elements exceeds the threshold,
* the internal array will be resized to double length.
*
* This class does not support null element.
*
* This class is not thread safe.
*
* @param <K> Key type for looking up the elements
* @param <E> Element type, which must be
* (1) a subclass of K, and
* (2) implementing {@link LinkedElement} interface.
*/
@InterfaceAudience.Private
public class LightWeightResizableGSet<K, E extends K>
extends LightWeightGSet<K, E> {
/**
* The default initial capacity - MUST be a power of two.
*/
static final int DEFAULT_INITIAL_CAPACITY = 1 << 4;
/**
* The load factor used when none specified in constructor.
*/
static final float DEFAULT_LOAD_FACTOR = 0.75f;
/** Size of the entry table. */
private int capacity;
/**
* The load factor for the hash set.
*/
private final float loadFactor;
private int threshold;
public LightWeightResizableGSet(int initCapacity, float loadFactor) {
if (initCapacity < 0) {
throw new HadoopIllegalArgumentException("Illegal initial capacity: " +
initCapacity);
}
if (loadFactor <= 0 || loadFactor > 1.0f) {
throw new HadoopIllegalArgumentException("Illegal load factor: " +
loadFactor);
}
this.capacity = actualArrayLength(initCapacity);
this.hash_mask = capacity - 1;
this.loadFactor = loadFactor;
this.threshold = (int) (capacity * loadFactor);
entries = new LinkedElement[capacity];
}
public LightWeightResizableGSet() {
this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR);
}
public LightWeightResizableGSet(int initCapacity) {
this(initCapacity, DEFAULT_LOAD_FACTOR);
}
@Override
public E put(final E element) {
E existing = super.put(element);
expandIfNecessary();
return existing;
}
/**
* Resize the internal table to given capacity.
*/
@SuppressWarnings("unchecked")
protected void resize(int cap) {
int newCapacity = actualArrayLength(cap);
if (newCapacity == this.capacity) {
return;
}
this.capacity = newCapacity;
this.threshold = (int) (capacity * loadFactor);
this.hash_mask = capacity - 1;
LinkedElement[] oldEntries = entries;
entries = new LinkedElement[capacity];
for (int i = 0; i < oldEntries.length; i++) {
LinkedElement e = oldEntries[i];
while (e != null) {
LinkedElement next = e.getNext();
int index = getIndex((E)e);
e.setNext(entries[index]);
entries[index] = e;
e = next;
}
}
}
/**
* Checks if we need to expand, and expands if necessary.
*/
protected void expandIfNecessary() {
if (size > this.threshold && capacity < MAX_ARRAY_LENGTH) {
resize(capacity * 2);
}
}
}

View File

@ -17,6 +17,7 @@
*/ */
package org.apache.hadoop.util; package org.apache.hadoop.util;
import java.util.Collection;
import java.util.ConcurrentModificationException; import java.util.ConcurrentModificationException;
import java.util.Iterator; import java.util.Iterator;
import java.util.Random; import java.util.Random;
@ -41,10 +42,15 @@ private static void println(Object s) {
@Test @Test
public void testExceptionCases() { public void testExceptionCases() {
testExceptionCases(false);
testExceptionCases(true);
}
private void testExceptionCases(boolean resizable) {
{ {
//test contains //test contains
final LightWeightGSet<Integer, Integer> gset final LightWeightGSet<Integer, Integer> gset
= new LightWeightGSet<Integer, Integer>(16); = createGSet(16, resizable);
try { try {
//test contains with a null element //test contains with a null element
gset.contains(null); gset.contains(null);
@ -57,7 +63,7 @@ public void testExceptionCases() {
{ {
//test get //test get
final LightWeightGSet<Integer, Integer> gset final LightWeightGSet<Integer, Integer> gset
= new LightWeightGSet<Integer, Integer>(16); = createGSet(16, resizable);
try { try {
//test get with a null element //test get with a null element
gset.get(null); gset.get(null);
@ -70,7 +76,7 @@ public void testExceptionCases() {
{ {
//test put //test put
final LightWeightGSet<Integer, Integer> gset final LightWeightGSet<Integer, Integer> gset
= new LightWeightGSet<Integer, Integer>(16); = createGSet(16, resizable);
try { try {
//test put with a null element //test put with a null element
gset.put(null); gset.put(null);
@ -97,7 +103,7 @@ public void testExceptionCases() {
for(int v = 1; v < data.length-1; v++) { for(int v = 1; v < data.length-1; v++) {
{ {
//test remove while iterating //test remove while iterating
final GSet<IntElement, IntElement> gset = createGSet(data); final GSet<IntElement, IntElement> gset = createGSet(data, resizable);
for(IntElement i : gset) { for(IntElement i : gset) {
if (i.value == v) { if (i.value == v) {
//okay because data[0] is not in gset //okay because data[0] is not in gset
@ -120,7 +126,7 @@ public void testExceptionCases() {
{ {
//test put new element while iterating //test put new element while iterating
final GSet<IntElement, IntElement> gset = createGSet(data); final GSet<IntElement, IntElement> gset = createGSet(data, resizable);
try { try {
for(IntElement i : gset) { for(IntElement i : gset) {
if (i.value == v) { if (i.value == v) {
@ -135,7 +141,7 @@ public void testExceptionCases() {
{ {
//test put existing element while iterating //test put existing element while iterating
final GSet<IntElement, IntElement> gset = createGSet(data); final GSet<IntElement, IntElement> gset = createGSet(data, resizable);
try { try {
for(IntElement i : gset) { for(IntElement i : gset) {
if (i.value == v) { if (i.value == v) {
@ -151,9 +157,17 @@ public void testExceptionCases() {
} }
} }
private static GSet<IntElement, IntElement> createGSet(final IntElement[] data) { private static LightWeightGSet<Integer, Integer> createGSet(
int size, boolean resizable) {
return resizable ? new LightWeightResizableGSet<Integer, Integer>(size) :
new LightWeightGSet<Integer, Integer>(size);
}
private static GSet<IntElement, IntElement> createGSet(
final IntElement[] data, boolean resizable) {
final GSet<IntElement, IntElement> gset final GSet<IntElement, IntElement> gset
= new LightWeightGSet<IntElement, IntElement>(8); = resizable ? new LightWeightResizableGSet<IntElement, IntElement>(8) :
new LightWeightGSet<IntElement, IntElement>(8);
for(int i = 1; i < data.length; i++) { for(int i = 1; i < data.length; i++) {
gset.put(data[i]); gset.put(data[i]);
} }
@ -168,6 +182,14 @@ public void testGSet() {
check(new GSetTestCase(255, 1 << 10, 65537)); check(new GSetTestCase(255, 1 << 10, 65537));
} }
@Test
public void testResizableGSet() {
//The parameters are: table length, data size, modulus, resizable.
check(new GSetTestCase(1, 1 << 4, 65537, true));
check(new GSetTestCase(17, 1 << 16, 17, true));
check(new GSetTestCase(255, 1 << 10, 65537, true));
}
/** /**
* A long running test with various data sets and parameters. * A long running test with various data sets and parameters.
* It may take ~5 hours, * It may take ~5 hours,
@ -177,14 +199,25 @@ public void testGSet() {
//@Test //@Test
public void runMultipleTestGSet() { public void runMultipleTestGSet() {
for(int offset = -2; offset <= 2; offset++) { for(int offset = -2; offset <= 2; offset++) {
runTestGSet(1, offset); runTestGSet(1, offset, false);
for(int i = 1; i < Integer.SIZE - 1; i++) { for(int i = 1; i < Integer.SIZE - 1; i++) {
runTestGSet((1 << i) + 1, offset); runTestGSet((1 << i) + 1, offset, false);
} }
} }
} }
private static void runTestGSet(final int modulus, final int offset) { //@Test
public void runMultipleTestResizableGSet() {
for(int offset = -2; offset <= 2; offset++) {
runTestGSet(1, offset, true);
for(int i = 1; i < Integer.SIZE - 1; i++) {
runTestGSet((1 << i) + 1, offset, true);
}
}
}
private static void runTestGSet(final int modulus, final int offset,
boolean resizable) {
println("\n\nmodulus=" + modulus + ", offset=" + offset); println("\n\nmodulus=" + modulus + ", offset=" + offset);
for(int i = 0; i <= 16; i += 4) { for(int i = 0; i <= 16; i += 4) {
final int tablelength = (1 << i) + offset; final int tablelength = (1 << i) + offset;
@ -194,7 +227,7 @@ private static void runTestGSet(final int modulus, final int offset) {
for(int j = 0; j <= upper; j += steps) { for(int j = 0; j <= upper; j += steps) {
final int datasize = 1 << j; final int datasize = 1 << j;
check(new GSetTestCase(tablelength, datasize, modulus)); check(new GSetTestCase(tablelength, datasize, modulus, resizable));
} }
} }
} }
@ -265,6 +298,10 @@ private static class GSetTestCase implements GSet<IntElement, IntElement> {
int contain_count = 0; int contain_count = 0;
GSetTestCase(int tablelength, int datasize, int modulus) { GSetTestCase(int tablelength, int datasize, int modulus) {
this(tablelength, datasize, modulus, false);
}
GSetTestCase(int tablelength, int datasize, int modulus, boolean resizable) {
denominator = Math.min((datasize >> 7) + 1, 1 << 16); denominator = Math.min((datasize >> 7) + 1, 1 << 16);
info = getClass().getSimpleName() info = getClass().getSimpleName()
+ ": tablelength=" + tablelength + ": tablelength=" + tablelength
@ -274,7 +311,8 @@ private static class GSetTestCase implements GSet<IntElement, IntElement> {
println(info); println(info);
data = new IntData(datasize, modulus); data = new IntData(datasize, modulus);
gset = new LightWeightGSet<IntElement, IntElement>(tablelength); gset = resizable ? new LightWeightResizableGSet<IntElement, IntElement>() :
new LightWeightGSet<IntElement, IntElement>(tablelength);
Assert.assertEquals(0, gset.size()); Assert.assertEquals(0, gset.size());
} }
@ -392,6 +430,11 @@ public void clear() {
gset.clear(); gset.clear();
Assert.assertEquals(0, size()); Assert.assertEquals(0, size());
} }
@Override
public Collection<IntElement> values() {
throw new UnsupportedOperationException();
}
} }
/** Test data set */ /** Test data set */

View File

@ -17,6 +17,7 @@
*/ */
package org.apache.hadoop.util; package org.apache.hadoop.util;
import java.util.Collection;
import java.util.Date; import java.util.Date;
import java.util.Iterator; import java.util.Iterator;
import java.util.Random; import java.util.Random;
@ -379,6 +380,11 @@ public void clear() {
cache.clear(); cache.clear();
Assert.assertEquals(0, size()); Assert.assertEquals(0, size());
} }
@Override
public Collection<IntEntry> values() {
throw new UnsupportedOperationException();
}
} }
private static class IntData { private static class IntData {

View File

@ -0,0 +1,252 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.util;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Random;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.Test;
import static org.junit.Assert.*;
/** Testing {@link LightWeightResizableGSet} */
public class TestLightWeightResizableGSet {
public static final Log LOG = LogFactory.getLog(TestLightWeightResizableGSet.class);
private Random random = new Random();
private TestElement[] generateElements(int length) {
TestElement[] elements = new TestElement[length];
Set<Long> keys = new HashSet<>();
long k = 0;
for (int i = 0; i < length; i++) {
while (keys.contains(k = random.nextLong()));
elements[i] = new TestElement(k, random.nextLong());
keys.add(k);
}
return elements;
}
private TestKey[] getKeys(TestElement[] elements) {
TestKey[] keys = new TestKey[elements.length];
for (int i = 0; i < elements.length; i++) {
keys[i] = new TestKey(elements[i].getKey());
}
return keys;
}
private TestElement[] generateElements(TestKey[] keys) {
TestElement[] elements = new TestElement[keys.length];
for (int i = 0; i < keys.length; i++) {
elements[i] = new TestElement(keys[i], random.nextLong());
}
return elements;
}
private static class TestKey {
private final long key;
TestKey(long key) {
this.key = key;
}
TestKey(TestKey other) {
this.key = other.key;
}
long getKey() {
return key;
}
@Override
public int hashCode() {
return (int)(key^(key>>>32));
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof TestKey)) {
return false;
}
TestKey other = (TestKey)o;
return key == other.key;
}
}
private static class TestElement extends TestKey
implements LightWeightResizableGSet.LinkedElement {
private final long data;
private LightWeightResizableGSet.LinkedElement next;
TestElement(long key, long data) {
super(key);
this.data = data;
}
TestElement(TestKey key, long data) {
super(key);
this.data = data;
}
long getData() {
return data;
}
@Override
public void setNext(LightWeightResizableGSet.LinkedElement next) {
this.next = next;
}
@Override
public LightWeightResizableGSet.LinkedElement getNext() {
return next;
}
}
@Test(timeout = 60000)
public void testBasicOperations() {
TestElement[] elements = generateElements(1 << 16);
final LightWeightResizableGSet<TestKey, TestElement> set =
new LightWeightResizableGSet<TestKey, TestElement>();
assertEquals(set.size(), 0);
// put all elements
for (int i = 0; i < elements.length; i++) {
TestElement element = set.put(elements[i]);
assertTrue(element == null);
}
// check the set size
assertEquals(set.size(), elements.length);
// check all elements exist in the set and the data is correct
for (int i = 0; i < elements.length; i++) {
assertTrue(set.contains(elements[i]));
TestElement element = set.get(elements[i]);
assertEquals(elements[i].getData(), element.getData());
}
TestKey[] keys = getKeys(elements);
// generate new elements with same key, but new data
TestElement[] newElements = generateElements(keys);
// update the set
for (int i = 0; i < newElements.length; i++) {
TestElement element = set.put(newElements[i]);
assertTrue(element != null);
}
// check the set size
assertEquals(set.size(), elements.length);
// check all elements exist in the set and the data is updated to new value
for (int i = 0; i < keys.length; i++) {
assertTrue(set.contains(keys[i]));
TestElement element = set.get(keys[i]);
assertEquals(newElements[i].getData(), element.getData());
}
// test LightWeightHashGSet#values
Collection<TestElement> cElements = set.values();
assertEquals(cElements.size(), elements.length);
for (TestElement element : cElements) {
assertTrue(set.contains(element));
}
// remove elements
for (int i = 0; i < keys.length; i++) {
TestElement element = set.remove(keys[i]);
assertTrue(element != null);
// the element should not exist after remove
assertFalse(set.contains(keys[i]));
}
// check the set size
assertEquals(set.size(), 0);
}
@Test(timeout = 60000)
public void testRemoveAll() {
TestElement[] elements = generateElements(1 << 16);
final LightWeightResizableGSet<TestKey, TestElement> set =
new LightWeightResizableGSet<TestKey, TestElement>();
assertEquals(set.size(), 0);
// put all elements
for (int i = 0; i < elements.length; i++) {
TestElement element = set.put(elements[i]);
assertTrue(element == null);
}
// check the set size
assertEquals(set.size(), elements.length);
// remove all through clear
{
set.clear();
assertEquals(set.size(), 0);
// check all elements removed
for (int i = 0; i < elements.length; i++) {
assertFalse(set.contains(elements[i]));
}
assertFalse(set.iterator().hasNext());
}
// put all elements back
for (int i = 0; i < elements.length; i++) {
TestElement element = set.put(elements[i]);
assertTrue(element == null);
}
// remove all through iterator
{
for (Iterator<TestElement> iter = set.iterator(); iter.hasNext(); ) {
TestElement element = iter.next();
// element should be there before removing
assertTrue(set.contains(element));
iter.remove();
// element should not be there now
assertFalse(set.contains(element));
}
// the deleted elements should not be there
for (int i = 0; i < elements.length; i++) {
assertFalse(set.contains(elements[i]));
}
// iterator should not have next
assertFalse(set.iterator().hasNext());
// check the set size
assertEquals(set.size(), 0);
}
}
}

View File

@ -991,6 +991,9 @@ Release 2.8.0 - UNRELEASED
HDFS-9148. Incorrect assert message in TestWriteToReplica#testWriteToTemporary HDFS-9148. Incorrect assert message in TestWriteToReplica#testWriteToTemporary
(Tony Wu via lei) (Tony Wu via lei)
HDFS-8859. Improve DataNode ReplicaMap memory footprint to save about 45%.
(yliu)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -18,20 +18,13 @@
package org.apache.hadoop.hdfs.server.datanode; package org.apache.hadoop.hdfs.server.datanode;
import java.io.File; import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.HardLink;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.LightWeightResizableGSet;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
@ -40,8 +33,12 @@
* It provides a general interface for meta information of a replica. * It provides a general interface for meta information of a replica.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
abstract public class ReplicaInfo extends Block implements Replica { abstract public class ReplicaInfo extends Block
implements Replica, LightWeightResizableGSet.LinkedElement {
/** For implementing {@link LightWeightResizableGSet.LinkedElement} interface */
private LightWeightResizableGSet.LinkedElement next;
/** volume where the replica belongs */ /** volume where the replica belongs */
private FsVolumeSpi volume; private FsVolumeSpi volume;
@ -229,4 +226,14 @@ public String toString() {
public boolean isOnTransientStorage() { public boolean isOnTransientStorage() {
return volume.isTransientStorage(); return volume.isTransientStorage();
} }
@Override
public LightWeightResizableGSet.LinkedElement getNext() {
return next;
}
@Override
public void setNext(LightWeightResizableGSet.LinkedElement next) {
this.next = next;
}
} }

View File

@ -743,7 +743,12 @@ private boolean readReplicasFromCache(ReplicaMap volumeMap,
// Now it is safe to add the replica into volumeMap // Now it is safe to add the replica into volumeMap
// In case of any exception during parsing this cache file, fall back // In case of any exception during parsing this cache file, fall back
// to scan all the files on disk. // to scan all the files on disk.
for (ReplicaInfo info: tmpReplicaMap.replicas(bpid)) { for (Iterator<ReplicaInfo> iter =
tmpReplicaMap.replicas(bpid).iterator(); iter.hasNext(); ) {
ReplicaInfo info = iter.next();
// We use a lightweight GSet to store replicaInfo, we need to remove
// it from one GSet before adding to another.
iter.remove();
volumeMap.add(bpid, info); volumeMap.add(bpid, info);
} }
LOG.info("Successfully read replica from cache file : " LOG.info("Successfully read replica from cache file : "

View File

@ -24,6 +24,7 @@
import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo; import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.util.LightWeightResizableGSet;
/** /**
* Maintains the replica map. * Maintains the replica map.
@ -33,9 +34,9 @@ class ReplicaMap {
private final Object mutex; private final Object mutex;
// Map of block pool Id to another map of block Id to ReplicaInfo. // Map of block pool Id to another map of block Id to ReplicaInfo.
private final Map<String, Map<Long, ReplicaInfo>> map = private final Map<String, LightWeightResizableGSet<Block, ReplicaInfo>> map =
new HashMap<String, Map<Long, ReplicaInfo>>(); new HashMap<String, LightWeightResizableGSet<Block, ReplicaInfo>>();
ReplicaMap(Object mutex) { ReplicaMap(Object mutex) {
if (mutex == null) { if (mutex == null) {
throw new HadoopIllegalArgumentException( throw new HadoopIllegalArgumentException(
@ -91,8 +92,8 @@ ReplicaInfo get(String bpid, Block block) {
ReplicaInfo get(String bpid, long blockId) { ReplicaInfo get(String bpid, long blockId) {
checkBlockPool(bpid); checkBlockPool(bpid);
synchronized(mutex) { synchronized(mutex) {
Map<Long, ReplicaInfo> m = map.get(bpid); LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
return m != null ? m.get(blockId) : null; return m != null ? m.get(new Block(blockId)) : null;
} }
} }
@ -108,13 +109,13 @@ ReplicaInfo add(String bpid, ReplicaInfo replicaInfo) {
checkBlockPool(bpid); checkBlockPool(bpid);
checkBlock(replicaInfo); checkBlock(replicaInfo);
synchronized(mutex) { synchronized(mutex) {
Map<Long, ReplicaInfo> m = map.get(bpid); LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
if (m == null) { if (m == null) {
// Add an entry for block pool if it does not exist already // Add an entry for block pool if it does not exist already
m = new HashMap<Long, ReplicaInfo>(); m = new LightWeightResizableGSet<Block, ReplicaInfo>();
map.put(bpid, m); map.put(bpid, m);
} }
return m.put(replicaInfo.getBlockId(), replicaInfo); return m.put(replicaInfo);
} }
} }
@ -137,14 +138,13 @@ ReplicaInfo remove(String bpid, Block block) {
checkBlockPool(bpid); checkBlockPool(bpid);
checkBlock(block); checkBlock(block);
synchronized(mutex) { synchronized(mutex) {
Map<Long, ReplicaInfo> m = map.get(bpid); LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
if (m != null) { if (m != null) {
Long key = Long.valueOf(block.getBlockId()); ReplicaInfo replicaInfo = m.get(block);
ReplicaInfo replicaInfo = m.get(key);
if (replicaInfo != null && if (replicaInfo != null &&
block.getGenerationStamp() == replicaInfo.getGenerationStamp()) { block.getGenerationStamp() == replicaInfo.getGenerationStamp()) {
return m.remove(key); return m.remove(block);
} }
} }
} }
@ -160,9 +160,9 @@ ReplicaInfo remove(String bpid, Block block) {
ReplicaInfo remove(String bpid, long blockId) { ReplicaInfo remove(String bpid, long blockId) {
checkBlockPool(bpid); checkBlockPool(bpid);
synchronized(mutex) { synchronized(mutex) {
Map<Long, ReplicaInfo> m = map.get(bpid); LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
if (m != null) { if (m != null) {
return m.remove(blockId); return m.remove(new Block(blockId));
} }
} }
return null; return null;
@ -174,7 +174,7 @@ ReplicaInfo remove(String bpid, long blockId) {
* @return the number of replicas in the map * @return the number of replicas in the map
*/ */
int size(String bpid) { int size(String bpid) {
Map<Long, ReplicaInfo> m = null; LightWeightResizableGSet<Block, ReplicaInfo> m = null;
synchronized(mutex) { synchronized(mutex) {
m = map.get(bpid); m = map.get(bpid);
return m != null ? m.size() : 0; return m != null ? m.size() : 0;
@ -192,7 +192,7 @@ int size(String bpid) {
* @return a collection of the replicas belonging to the block pool * @return a collection of the replicas belonging to the block pool
*/ */
Collection<ReplicaInfo> replicas(String bpid) { Collection<ReplicaInfo> replicas(String bpid) {
Map<Long, ReplicaInfo> m = null; LightWeightResizableGSet<Block, ReplicaInfo> m = null;
m = map.get(bpid); m = map.get(bpid);
return m != null ? m.values() : null; return m != null ? m.values() : null;
} }
@ -200,10 +200,10 @@ Collection<ReplicaInfo> replicas(String bpid) {
void initBlockPool(String bpid) { void initBlockPool(String bpid) {
checkBlockPool(bpid); checkBlockPool(bpid);
synchronized(mutex) { synchronized(mutex) {
Map<Long, ReplicaInfo> m = map.get(bpid); LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
if (m == null) { if (m == null) {
// Add an entry for block pool if it does not exist already // Add an entry for block pool if it does not exist already
m = new HashMap<Long, ReplicaInfo>(); m = new LightWeightResizableGSet<Block, ReplicaInfo>();
map.put(bpid, m); map.put(bpid, m);
} }
} }