YARN-9040. Fixed memory leak in LevelDBCacheTimelineStore and DBIterator.

Contributed by Tarun Parimi
This commit is contained in:
Eric Yang 2018-12-17 12:04:25 -05:00
parent 346c0c8aff
commit 71e0b0d800
4 changed files with 99 additions and 59 deletions

View File

@ -42,7 +42,6 @@ import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@ -50,6 +49,7 @@ import java.util.Set;
import java.util.SortedSet;
import static org.apache.hadoop.yarn.server.timeline.TimelineDataManager.DEFAULT_DOMAIN_ID;
import static org.apache.hadoop.yarn.server.timeline.TimelineStoreMapAdapter.CloseableIterator;
/**
* Map based implementation of {@link TimelineStore}. A hash map
@ -114,66 +114,67 @@ abstract class KeyValueBasedTimelineStore
fields = EnumSet.allOf(Field.class);
}
Iterator<TimelineEntity> entityIterator = null;
TimelineEntity firstEntity = null;
if (fromId != null) {
TimelineEntity firstEntity = entities.get(new EntityIdentifier(fromId,
firstEntity = entities.get(new EntityIdentifier(fromId,
entityType));
if (firstEntity == null) {
return new TimelineEntities();
} else {
entityIterator = entities.valueSetIterator(firstEntity);
}
}
if (entityIterator == null) {
entityIterator = entities.valueSetIterator();
}
List<TimelineEntity> entitiesSelected = new ArrayList<TimelineEntity>();
while (entityIterator.hasNext()) {
TimelineEntity entity = entityIterator.next();
if (entitiesSelected.size() >= limit) {
break;
}
if (!entity.getEntityType().equals(entityType)) {
continue;
}
if (entity.getStartTime() <= windowStart) {
continue;
}
if (entity.getStartTime() > windowEnd) {
continue;
}
if (fromTs != null && entityInsertTimes.get(new EntityIdentifier(
entity.getEntityId(), entity.getEntityType())) > fromTs) {
continue;
}
if (primaryFilter != null &&
!KeyValueBasedTimelineStoreUtils.matchPrimaryFilter(
entity.getPrimaryFilters(), primaryFilter)) {
continue;
}
if (secondaryFilters != null) { // AND logic
boolean flag = true;
for (NameValuePair secondaryFilter : secondaryFilters) {
if (secondaryFilter != null && !KeyValueBasedTimelineStoreUtils
.matchPrimaryFilter(entity.getPrimaryFilters(), secondaryFilter)
&& !KeyValueBasedTimelineStoreUtils.matchFilter(
entity.getOtherInfo(), secondaryFilter)) {
flag = false;
break;
}
try(CloseableIterator<TimelineEntity> entityIterator =
firstEntity == null ? entities.valueSetIterator() :
entities.valueSetIterator(firstEntity)) {
while (entityIterator.hasNext()) {
TimelineEntity entity = entityIterator.next();
if (entitiesSelected.size() >= limit) {
break;
}
if (!flag) {
if (!entity.getEntityType().equals(entityType)) {
continue;
}
}
if (entity.getDomainId() == null) {
entity.setDomainId(DEFAULT_DOMAIN_ID);
}
if (checkAcl == null || checkAcl.check(entity)) {
entitiesSelected.add(entity);
if (entity.getStartTime() <= windowStart) {
continue;
}
if (entity.getStartTime() > windowEnd) {
continue;
}
if (fromTs != null && entityInsertTimes.get(
new EntityIdentifier(entity.getEntityId(), entity.getEntityType()))
> fromTs) {
continue;
}
if (primaryFilter != null && !KeyValueBasedTimelineStoreUtils
.matchPrimaryFilter(entity.getPrimaryFilters(), primaryFilter)) {
continue;
}
if (secondaryFilters != null) { // AND logic
boolean flag = true;
for (NameValuePair secondaryFilter : secondaryFilters) {
if (secondaryFilter != null && !KeyValueBasedTimelineStoreUtils
.matchPrimaryFilter(entity.getPrimaryFilters(), secondaryFilter)
&& !KeyValueBasedTimelineStoreUtils
.matchFilter(entity.getOtherInfo(), secondaryFilter)) {
flag = false;
break;
}
}
if (!flag) {
continue;
}
}
if (entity.getDomainId() == null) {
entity.setDomainId(DEFAULT_DOMAIN_ID);
}
if (checkAcl == null || checkAcl.check(entity)) {
entitiesSelected.add(entity);
}
}
}
List<TimelineEntity> entitiesToReturn = new ArrayList<TimelineEntity>();
for (TimelineEntity entitySelected : entitiesSelected) {
entitiesToReturn.add(KeyValueBasedTimelineStoreUtils.maskFields(
@ -569,6 +570,7 @@ abstract class KeyValueBasedTimelineStore
}
return o;
}
}
}

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.timeline;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
@ -59,14 +60,15 @@ public class MemoryTimelineStore extends KeyValueBasedTimelineStore {
}
@Override
public Iterator<V>
public CloseableIterator<V>
valueSetIterator() {
return new TreeSet<>(internalMap.values()).iterator();
return wrapClosableIterator(new TreeSet<>(internalMap.values())
.iterator());
}
@Override
@SuppressWarnings("unchecked")
public Iterator<V> valueSetIterator(V minV) {
public CloseableIterator<V> valueSetIterator(V minV) {
if (minV instanceof Comparable) {
TreeSet<V> tempTreeSet = new TreeSet<>();
for (V value : internalMap.values()) {
@ -74,11 +76,38 @@ public class MemoryTimelineStore extends KeyValueBasedTimelineStore {
tempTreeSet.add(value);
}
}
return tempTreeSet.iterator();
return wrapClosableIterator(tempTreeSet.iterator());
} else {
return valueSetIterator();
}
}
private CloseableIterator<V> wrapClosableIterator(
final Iterator<V> iterator) {
return new CloseableIterator<V>() {
private final Iterator<V> internalIterator = iterator;
@Override
public void close() throws IOException {
// Not implemented
}
@Override
public boolean hasNext() {
return internalIterator.hasNext();
}
@Override
public V next() {
return internalIterator.next();
}
@Override
public void remove() {
internalIterator.remove();
}
};
}
}
public MemoryTimelineStore() {

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.timeline;
import java.io.Closeable;
import java.util.Iterator;
/**
@ -48,7 +49,7 @@ interface TimelineStoreMapAdapter<K, V> {
/**
* @return the iterator of the value set of the map
*/
Iterator<V> valueSetIterator();
CloseableIterator<V> valueSetIterator();
/**
* Return the iterator of the value set of the map, starting from minV if type
@ -56,5 +57,9 @@ interface TimelineStoreMapAdapter<K, V> {
* @param minV
* @return
*/
Iterator<V> valueSetIterator(V minV);
CloseableIterator<V> valueSetIterator(V minV);
interface CloseableIterator<V> extends Iterator<V>, Closeable {}
}

View File

@ -37,7 +37,6 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
/**
@ -211,18 +210,18 @@ public class LevelDBCacheTimelineStore extends KeyValueBasedTimelineStore {
}
@Override
public Iterator<V> valueSetIterator() {
public CloseableIterator<V> valueSetIterator() {
return getIterator(null, Long.MAX_VALUE);
}
@Override
public Iterator<V> valueSetIterator(V minV) {
public CloseableIterator<V> valueSetIterator(V minV) {
return getIterator(
new EntityIdentifier(minV.getEntityId(), minV.getEntityType()),
minV.getStartTime());
}
private Iterator<V> getIterator(
private CloseableIterator<V> getIterator(
EntityIdentifier startId, long startTimeMax) {
final DBIterator internalDbIterator = entityDb.iterator();
@ -247,7 +246,7 @@ public class LevelDBCacheTimelineStore extends KeyValueBasedTimelineStore {
= entityPrefixKeyBuilder.getBytesForLookup();
internalDbIterator.seek(startPrefixBytes);
return new Iterator<V>() {
return new CloseableIterator<V>() {
@Override
public boolean hasNext() {
if (!internalDbIterator.hasNext()) {
@ -284,6 +283,11 @@ public class LevelDBCacheTimelineStore extends KeyValueBasedTimelineStore {
LOG.error("LevelDB map adapter does not support iterate-and-remove"
+ " use cases. ");
}
@Override
public void close() throws IOException {
internalDbIterator.close();
}
};
}
static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();