diff --git a/mapreduce/CHANGES.txt b/mapreduce/CHANGES.txt index 2613b07631..9d0f0be8f7 100644 --- a/mapreduce/CHANGES.txt +++ b/mapreduce/CHANGES.txt @@ -396,6 +396,9 @@ Trunk (unreleased changes) MAPREDUCE-2837. Ported bug fixes from y-merge to prepare for MAPREDUCE-279 merge. (acmurthy) + MAPREDUCE-2541. Fixed a race condition in IndexCache.removeMap. (Binglin + Chang via acmurthy) + Release 0.22.0 - Unreleased diff --git a/mapreduce/src/java/org/apache/hadoop/mapred/IndexCache.java b/mapreduce/src/java/org/apache/hadoop/mapred/IndexCache.java index 8718c90b2f..63c148d5f2 100644 --- a/mapreduce/src/java/org/apache/hadoop/mapred/IndexCache.java +++ b/mapreduce/src/java/org/apache/hadoop/mapred/IndexCache.java @@ -130,12 +130,19 @@ private IndexInformation readIndexFileToCache(Path indexFileName, } /** - * This method removes the map from the cache. It should be called when - * a map output on this tracker is discarded. + * This method removes the map from the cache if index information for this + * map is loaded(size>0), index information entry in cache will not be + * removed if it is in the loading phrase(size=0), this prevents corruption + * of totalMemoryUsed. It should be called when a map output on this tracker + * is discarded. * @param mapId The taskID of this map. */ public void removeMap(String mapId) { - IndexInformation info = cache.remove(mapId); + IndexInformation info = cache.get(mapId); + if ((info != null) && (info.getSize() == 0)) { + return; + } + info = cache.remove(mapId); if (info != null) { totalMemoryUsed.addAndGet(-info.getSize()); if (!queue.remove(mapId)) { @@ -146,6 +153,19 @@ public void removeMap(String mapId) { } } + /** + * This method checks if cache and totolMemoryUsed is consistent. + * It is only used for unit test. + * @return True if cache and totolMemoryUsed is consistent + */ + boolean checkTotalMemoryUsed() { + int totalSize = 0; + for (IndexInformation info : cache.values()) { + totalSize += info.getSize(); + } + return totalSize == totalMemoryUsed.get(); + } + /** * Bring memory usage below totalMemoryAllowed. */ diff --git a/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestIndexCache.java b/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestIndexCache.java index 2e81865743..e003772264 100644 --- a/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestIndexCache.java +++ b/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestIndexCache.java @@ -193,6 +193,60 @@ public void testInvalidReduceNumberOrLength() throws Exception { } } + public void testRemoveMap() throws Exception { + // This test case use two thread to call getIndexInformation and + // removeMap concurrently, in order to construct race condition. + // This test case may not repeatable. But on my macbook this test + // fails with probability of 100% on code before MAPREDUCE-2541, + // so it is repeatable in practice. + JobConf conf = new JobConf(); + FileSystem fs = FileSystem.getLocal(conf).getRaw(); + Path p = new Path(System.getProperty("test.build.data", "/tmp"), + "cache").makeQualified(fs); + fs.delete(p, true); + conf.setInt(TTConfig.TT_INDEX_CACHE, 10); + // Make a big file so removeMapThread almost surely runs faster than + // getInfoThread + final int partsPerMap = 100000; + final int bytesPerFile = partsPerMap * 24; + final IndexCache cache = new IndexCache(conf); + + final Path big = new Path(p, "bigIndex"); + final String user = + UserGroupInformation.getCurrentUser().getShortUserName(); + writeFile(fs, big, bytesPerFile, partsPerMap); + + // run multiple times + for (int i = 0; i < 20; ++i) { + Thread getInfoThread = new Thread() { + @Override + public void run() { + try { + cache.getIndexInformation("bigIndex", partsPerMap, big, user); + } catch (Exception e) { + // should not be here + } + } + }; + Thread removeMapThread = new Thread() { + @Override + public void run() { + cache.removeMap("bigIndex"); + } + }; + if (i%2==0) { + getInfoThread.start(); + removeMapThread.start(); + } else { + removeMapThread.start(); + getInfoThread.start(); + } + getInfoThread.join(); + removeMapThread.join(); + assertEquals(true, cache.checkTotalMemoryUsed()); + } + } + private static void checkRecord(IndexRecord rec, long fill) { assertEquals(fill, rec.startOffset); assertEquals(fill, rec.rawLength);