MAPREDUCE-7150. Optimize collections used by MR JHS to reduce its memory. (Contributed by Misha Dmitriev)

This commit is contained in:
Haibo Chen 2018-10-16 13:44:41 -07:00
parent c2288ac45b
commit babd1449bf
4 changed files with 38 additions and 27 deletions

View File

@ -61,8 +61,9 @@ public abstract class FileSystemCounterGroup<C extends Counter>
// C[] would need Array.newInstance which requires a Class<C> reference. // C[] would need Array.newInstance which requires a Class<C> reference.
// Just a few local casts probably worth not having to carry it around. // Just a few local casts probably worth not having to carry it around.
private final Map<String, Object[]> map = // Initialized lazily, since in some situations millions of empty maps can
new ConcurrentSkipListMap<String, Object[]>(); // waste a substantial (e.g. 4% as we observed) portion of the heap
private Map<String, Object[]> map;
private String displayName; private String displayName;
private static final Joiner NAME_JOINER = Joiner.on('_'); private static final Joiner NAME_JOINER = Joiner.on('_');
@ -214,6 +215,9 @@ public C findCounter(String counterName) {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public synchronized C findCounter(String scheme, FileSystemCounter key) { public synchronized C findCounter(String scheme, FileSystemCounter key) {
final String canonicalScheme = checkScheme(scheme); final String canonicalScheme = checkScheme(scheme);
if (map == null) {
map = new ConcurrentSkipListMap<>();
}
Object[] counters = map.get(canonicalScheme); Object[] counters = map.get(canonicalScheme);
int ord = key.ordinal(); int ord = key.ordinal();
if (counters == null) { if (counters == null) {
@ -247,11 +251,13 @@ private String checkScheme(String scheme) {
protected abstract C newCounter(String scheme, FileSystemCounter key); protected abstract C newCounter(String scheme, FileSystemCounter key);
@Override @Override
public int size() { public synchronized int size() {
int n = 0; int n = 0;
if (map != null) {
for (Object[] counters : map.values()) { for (Object[] counters : map.values()) {
n += numSetCounters(counters); n += numSetCounters(counters);
} }
}
return n; return n;
} }
@ -271,7 +277,8 @@ public void incrAllCounters(CounterGroupBase<C> other) {
* FileSystemGroup ::= #scheme (scheme #counter (key value)*)* * FileSystemGroup ::= #scheme (scheme #counter (key value)*)*
*/ */
@Override @Override
public void write(DataOutput out) throws IOException { public synchronized void write(DataOutput out) throws IOException {
if (map != null) {
WritableUtils.writeVInt(out, map.size()); // #scheme WritableUtils.writeVInt(out, map.size()); // #scheme
for (Map.Entry<String, Object[]> entry : map.entrySet()) { for (Map.Entry<String, Object[]> entry : map.entrySet()) {
WritableUtils.writeString(out, entry.getKey()); // scheme WritableUtils.writeString(out, entry.getKey()); // scheme
@ -285,6 +292,9 @@ public void write(DataOutput out) throws IOException {
WritableUtils.writeVLong(out, c.getValue()); // value WritableUtils.writeVLong(out, c.getValue()); // value
} }
} }
} else {
WritableUtils.writeVInt(out, 0);
}
} }
private int numSetCounters(Object[] counters) { private int numSetCounters(Object[] counters) {
@ -310,8 +320,8 @@ public void readFields(DataInput in) throws IOException {
@Override @Override
public Iterator<C> iterator() { public Iterator<C> iterator() {
return new AbstractIterator<C>() { return new AbstractIterator<C>() {
Iterator<Object[]> it = map.values().iterator(); Iterator<Object[]> it = map != null ? map.values().iterator() : null;
Object[] counters = it.hasNext() ? it.next() : null; Object[] counters = (it != null && it.hasNext()) ? it.next() : null;
int i = 0; int i = 0;
@Override @Override
protected C computeNext() { protected C computeNext() {
@ -322,7 +332,7 @@ protected C computeNext() {
if (counter != null) return counter; if (counter != null) return counter;
} }
i = 0; i = 0;
counters = it.hasNext() ? it.next() : null; counters = (it != null && it.hasNext()) ? it.next() : null;
} }
return endOfData(); return endOfData();
} }
@ -343,9 +353,11 @@ public synchronized boolean equals(Object genericRight) {
public synchronized int hashCode() { public synchronized int hashCode() {
// need to be deep as counters is an array // need to be deep as counters is an array
int hash = FileSystemCounter.class.hashCode(); int hash = FileSystemCounter.class.hashCode();
if (map != null) {
for (Object[] counters : map.values()) { for (Object[] counters : map.values()) {
if (counters != null) hash ^= Arrays.hashCode(counters); if (counters != null) hash ^= Arrays.hashCode(counters);
} }
}
return hash; return hash;
} }
} }

View File

@ -600,7 +600,7 @@ public static class TaskInfo {
public TaskInfo() { public TaskInfo() {
startTime = finishTime = -1; startTime = finishTime = -1;
error = splitLocations = ""; error = splitLocations = "";
attemptsMap = new HashMap<TaskAttemptID, TaskAttemptInfo>(); attemptsMap = new HashMap<TaskAttemptID, TaskAttemptInfo>(2);
} }
public void printAll() { public void printAll() {

View File

@ -20,7 +20,6 @@
import java.util.ArrayList; import java.util.ArrayList;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -48,11 +47,11 @@ public class CompletedTask implements Task {
private final TaskInfo taskInfo; private final TaskInfo taskInfo;
private TaskReport report; private TaskReport report;
private TaskAttemptId successfulAttempt; private TaskAttemptId successfulAttempt;
private List<String> reportDiagnostics = new LinkedList<String>(); private List<String> reportDiagnostics = new ArrayList<String>(2);
private Lock taskAttemptsLock = new ReentrantLock(); private Lock taskAttemptsLock = new ReentrantLock();
private AtomicBoolean taskAttemptsLoaded = new AtomicBoolean(false); private AtomicBoolean taskAttemptsLoaded = new AtomicBoolean(false);
private final Map<TaskAttemptId, TaskAttempt> attempts = private final Map<TaskAttemptId, TaskAttempt> attempts =
new LinkedHashMap<TaskAttemptId, TaskAttempt>(); new LinkedHashMap<TaskAttemptId, TaskAttempt>(2);
CompletedTask(TaskId taskId, TaskInfo taskInfo) { CompletedTask(TaskId taskId, TaskInfo taskInfo) {
//TODO JobHistoryParser.handleTaskFailedAttempt should use state from the event. //TODO JobHistoryParser.handleTaskFailedAttempt should use state from the event.

View File

@ -39,7 +39,7 @@ public class CompletedTaskAttempt implements TaskAttempt {
private final TaskAttemptInfo attemptInfo; private final TaskAttemptInfo attemptInfo;
private final TaskAttemptId attemptId; private final TaskAttemptId attemptId;
private final TaskAttemptState state; private final TaskAttemptState state;
private final List<String> diagnostics = new ArrayList<String>(); private final List<String> diagnostics = new ArrayList<String>(2);
private TaskAttemptReport report; private TaskAttemptReport report;
private String localDiagMessage; private String localDiagMessage;