From ac44e0a0d0cebc07904518f32e0d8e34a4391adf Mon Sep 17 00:00:00 2001 From: Jason Darrell Lowe Date: Mon, 3 Jun 2013 14:46:30 +0000 Subject: [PATCH] MAPREDUCE-5268. Improve history server startup performance. Contributed by Karthik Kambatla git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1489012 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 18 ++++ .../mapreduce/v2/hs/HistoryFileManager.java | 62 +++++++++++++- .../v2/hs/TestJobIdHistoryFileInfoMap.java | 78 ++++++++++++++++++ .../mapreduce/v2/hs/TestJobListCache.java | 82 +++++++++++++++++++ 4 files changed, 237 insertions(+), 3 deletions(-) create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobIdHistoryFileInfoMap.java create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobListCache.java diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 9ae7fdac75..3e376779ff 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -281,6 +281,9 @@ Release 2.1.0-beta - UNRELEASED MAPREDUCE-4974. Optimising the LineRecordReader initialize() method (Gelesh via bobby) + MAPREDUCE-5268. Improve history server startup performance (Karthik + Kambatla via jlowe) + BUG FIXES MAPREDUCE-4671. AM does not tell the RM about container requests which are @@ -1064,6 +1067,21 @@ Release 2.0.0-alpha - 05-23-2012 MAPREDUCE-4444. nodemanager fails to start when one of the local-dirs is bad (Jason Lowe via bobby) +Release 0.23.9 - UNRELEASED + + INCOMPATIBLE CHANGES + + NEW FEATURES + + IMPROVEMENTS + + OPTIMIZATIONS + + MAPREDUCE-5268. Improve history server startup performance (Karthik + Kambatla via jlowe) + + BUG FIXES + Release 0.23.8 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java index a1887f72d4..a55e5ad191 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java @@ -26,6 +26,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.NavigableSet; import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; @@ -36,6 +37,7 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -131,19 +133,73 @@ public synchronized Set get(String serialPart) { } } - static class JobListCache { + /** + * Wrapper around {@link ConcurrentSkipListMap} that maintains size along + * side for O(1) size() implementation for use in JobListCache. + * + * Note: The size is not updated atomically with changes additions/removals. + * This race can lead to size() returning an incorrect size at times. + */ + static class JobIdHistoryFileInfoMap { private ConcurrentSkipListMap cache; + private AtomicInteger mapSize; + + JobIdHistoryFileInfoMap() { + cache = new ConcurrentSkipListMap(); + mapSize = new AtomicInteger(); + } + + public HistoryFileInfo putIfAbsent(JobId key, HistoryFileInfo value) { + HistoryFileInfo ret = cache.putIfAbsent(key, value); + if (ret == null) { + mapSize.incrementAndGet(); + } + return ret; + } + + public HistoryFileInfo remove(JobId key) { + HistoryFileInfo ret = cache.remove(key); + if (ret != null) { + mapSize.decrementAndGet(); + } + return ret; + } + + /** + * Returns the recorded size of the internal map. Note that this could be out + * of sync with the actual size of the map + * @return "recorded" size + */ + public int size() { + return mapSize.get(); + } + + public HistoryFileInfo get(JobId key) { + return cache.get(key); + } + + public NavigableSet navigableKeySet() { + return cache.navigableKeySet(); + } + + public Collection values() { + return cache.values(); + } + } + + static class JobListCache { + private JobIdHistoryFileInfoMap cache; private int maxSize; private long maxAge; public JobListCache(int maxSize, long maxAge) { this.maxSize = maxSize; this.maxAge = maxAge; - this.cache = new ConcurrentSkipListMap(); + this.cache = new JobIdHistoryFileInfoMap(); } public HistoryFileInfo addIfAbsent(HistoryFileInfo fileInfo) { - JobId jobId = fileInfo.getJobIndexInfo().getJobId(); + JobId jobId = fileInfo.getJobId(); if (LOG.isDebugEnabled()) { LOG.debug("Adding " + jobId + " to job list cache with " + fileInfo.getJobIndexInfo()); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobIdHistoryFileInfoMap.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobIdHistoryFileInfoMap.java new file mode 100644 index 0000000000..7fbaf8b797 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobIdHistoryFileInfoMap.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapreduce.v2.hs; + +import java.util.Collection; +import java.util.NavigableSet; + +import org.apache.hadoop.mapreduce.v2.api.records.JobId; +import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo; +import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.JobIdHistoryFileInfoMap; +import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; +import org.junit.Test; +import org.mockito.Mockito; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestJobIdHistoryFileInfoMap { + + private boolean checkSize(JobIdHistoryFileInfoMap map, int size) + throws InterruptedException { + for (int i = 0; i < 100; i++) { + if (map.size() != size) + Thread.sleep(20); + else + return true; + } + return false; + } + + /** + * Trivial test case that verifies basic functionality of {@link + * JobIdHistoryFileInfoMap} + */ + @Test(timeout = 2000) + public void testWithSingleElement() throws InterruptedException { + JobIdHistoryFileInfoMap mapWithSize = new JobIdHistoryFileInfoMap(); + + JobId jobId = MRBuilderUtils.newJobId(1, 1, 1); + HistoryFileInfo fileInfo1 = Mockito.mock(HistoryFileInfo.class); + Mockito.when(fileInfo1.getJobId()).thenReturn(jobId); + + // add it twice + assertEquals("Incorrect return on putIfAbsent()", + null, mapWithSize.putIfAbsent(jobId, fileInfo1)); + assertEquals("Incorrect return on putIfAbsent()", + fileInfo1, mapWithSize.putIfAbsent(jobId, fileInfo1)); + + // check get() + assertEquals("Incorrect get()", fileInfo1, mapWithSize.get(jobId)); + assertTrue("Incorrect size()", checkSize(mapWithSize, 1)); + + // check navigableKeySet() + NavigableSet set = mapWithSize.navigableKeySet(); + assertEquals("Incorrect navigableKeySet()", 1, set.size()); + assertTrue("Incorrect navigableKeySet()", set.contains(jobId)); + + // check values() + Collection values = mapWithSize.values(); + assertEquals("Incorrect values()", 1, values.size()); + assertTrue("Incorrect values()", values.contains(fileInfo1)); + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobListCache.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobListCache.java new file mode 100644 index 0000000000..6ebbb7c139 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobListCache.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapreduce.v2.hs; + +import java.lang.InterruptedException; +import java.util.Collection; + +import org.apache.hadoop.mapreduce.v2.api.records.JobId; +import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo; +import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.JobListCache; +import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; +import org.junit.Test; +import org.mockito.Mockito; + +import static org.junit.Assert.*; + +public class TestJobListCache { + + @Test (timeout = 1000) + public void testAddExisting() { + JobListCache cache = new JobListCache(2, 1000); + + JobId jobId = MRBuilderUtils.newJobId(1, 1, 1); + HistoryFileInfo fileInfo = Mockito.mock(HistoryFileInfo.class); + Mockito.when(fileInfo.getJobId()).thenReturn(jobId); + + cache.addIfAbsent(fileInfo); + cache.addIfAbsent(fileInfo); + assertEquals("Incorrect number of cache entries", 1, + cache.values().size()); + } + + @Test (timeout = 1000) + public void testEviction() throws InterruptedException { + int maxSize = 2; + JobListCache cache = new JobListCache(maxSize, 1000); + + JobId jobId1 = MRBuilderUtils.newJobId(1, 1, 1); + HistoryFileInfo fileInfo1 = Mockito.mock(HistoryFileInfo.class); + Mockito.when(fileInfo1.getJobId()).thenReturn(jobId1); + + JobId jobId2 = MRBuilderUtils.newJobId(2, 2, 2); + HistoryFileInfo fileInfo2 = Mockito.mock(HistoryFileInfo.class); + Mockito.when(fileInfo2.getJobId()).thenReturn(jobId2); + + JobId jobId3 = MRBuilderUtils.newJobId(3, 3, 3); + HistoryFileInfo fileInfo3 = Mockito.mock(HistoryFileInfo.class); + Mockito.when(fileInfo3.getJobId()).thenReturn(jobId3); + + cache.addIfAbsent(fileInfo1); + cache.addIfAbsent(fileInfo2); + cache.addIfAbsent(fileInfo3); + + Collection values; + for (int i = 0; i < 9; i++) { + values = cache.values(); + if (values.size() > maxSize) { + Thread.sleep(100); + } else { + assertFalse("fileInfo1 should have been evicted", + values.contains(fileInfo1)); + return; + } + } + fail("JobListCache didn't delete the extra entry"); + } +}