diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IndexCache.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IndexCache.java index 80cbcca4e2..0e24bbe533 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IndexCache.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IndexCache.java @@ -72,12 +72,11 @@ public IndexRecord getIndexInformation(String mapId, int reduce, try { info.wait(); } catch (InterruptedException e) { - Thread.currentThread().interrupt(); throw new IOException("Interrupted waiting for construction", e); } } } - LOG.debug("IndexCache HIT: MapId {} found", mapId); + LOG.debug("IndexCache HIT: MapId " + mapId + " found"); } if (info.mapSpillRecord.size() == 0 || @@ -107,91 +106,63 @@ private IndexInformation readIndexFileToCache(Path indexFileName, try { info.wait(); } catch (InterruptedException e) { - Thread.currentThread().interrupt(); throw new IOException("Interrupted waiting for construction", e); } } } - LOG.debug("IndexCache HIT: MapId {} found", mapId); + LOG.debug("IndexCache HIT: MapId " + mapId + " found"); return info; } - LOG.debug("IndexCache MISS: MapId {} not found", mapId); + LOG.debug("IndexCache MISS: MapId " + mapId + " not found") ; SpillRecord tmp = null; - boolean success = false; try { tmp = new SpillRecord(indexFileName, conf, expectedIndexOwner); - success = true; - } catch (Throwable e) { + } catch (Throwable e) { tmp = new SpillRecord(0); cache.remove(mapId); - if (e instanceof InterruptedException) { - Thread.currentThread().interrupt(); - } throw new IOException("Error Reading IndexFile", e); - } finally { - synchronized (newInd) { + } finally { + synchronized (newInd) { newInd.mapSpillRecord = tmp; - if (success) { - // Only add mapId to the queue for successful read and after added to - // the cache. Once in the queue, it is now eligible for removal once - // construction is finished. - queue.add(mapId); - if (totalMemoryUsed.addAndGet(newInd.getSize()) > totalMemoryAllowed) { - freeIndexInformation(); - } - } newInd.notifyAll(); } } - + queue.add(mapId); + + if (totalMemoryUsed.addAndGet(newInd.getSize()) > totalMemoryAllowed) { + freeIndexInformation(); + } return newInd; } /** - * This method removes the map from the cache if it is present in the queue. + * This method removes the map from the cache if index information for this + * map is loaded(size>0), index information entry in cache will not be + * removed if it is in the loading phrase(size=0), this prevents corruption + * of totalMemoryUsed. It should be called when a map output on this tracker + * is discarded. * @param mapId The taskID of this map. */ - public void removeMap(String mapId) throws IOException { - // Successfully removing the mapId from the queue enters into a contract - // that this thread will remove the corresponding mapId from the cache. - if (!queue.remove(mapId)) { - LOG.debug("Map ID {} not found in queue", mapId); + public void removeMap(String mapId) { + IndexInformation info = cache.get(mapId); + if (info == null || isUnderConstruction(info)) { return; } - removeMapInternal(mapId); - } - - /** This method should only be called upon successful removal of mapId from - * the queue. The mapId will be removed from the cache and totalUsedMemory - * will be decremented. - * @param mapId the cache item to be removed - * @throws IOException - */ - private void removeMapInternal(String mapId) throws IOException { - IndexInformation info = cache.remove(mapId); - if (info == null) { - // Inconsistent state as presence in queue implies presence in cache - LOG.warn("Map ID " + mapId + " not found in cache"); - return; - } - try { - synchronized(info) { - while (isUnderConstruction(info)) { - info.wait(); - } - totalMemoryUsed.getAndAdd(-info.getSize()); + info = cache.remove(mapId); + if (info != null) { + totalMemoryUsed.addAndGet(-info.getSize()); + if (!queue.remove(mapId)) { + LOG.warn("Map ID" + mapId + " not found in queue!!"); } - } catch (InterruptedException e) { - totalMemoryUsed.getAndAdd(-info.getSize()); - Thread.currentThread().interrupt(); - throw new IOException("Interrupted waiting for construction", e); + } else { + LOG.info("Map ID " + mapId + " not found in cache"); } } /** - * This method checks if cache and totalMemoryUsed is consistent. + * This method checks if cache and totolMemoryUsed is consistent. * It is only used for unit test. - * @return True if cache and totalMemoryUsed is consistent + * @return True if cache and totolMemoryUsed is consistent */ boolean checkTotalMemoryUsed() { int totalSize = 0; @@ -204,13 +175,13 @@ boolean checkTotalMemoryUsed() { /** * Bring memory usage below totalMemoryAllowed. */ - private synchronized void freeIndexInformation() throws IOException { + private synchronized void freeIndexInformation() { while (totalMemoryUsed.get() > totalMemoryAllowed) { - if(queue.isEmpty()) { - break; + String s = queue.remove(); + IndexInformation info = cache.remove(s); + if (info != null) { + totalMemoryUsed.addAndGet(-info.getSize()); } - String mapId = queue.remove(); - removeMapInternal(mapId); } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestIndexCache.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestIndexCache.java index 09db2c680f..dabce770e8 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestIndexCache.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestIndexCache.java @@ -21,7 +21,6 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.util.Random; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.zip.CRC32; import java.util.zip.CheckedOutputStream; @@ -217,32 +216,23 @@ public void testRemoveMap() throws Exception { final String user = UserGroupInformation.getCurrentUser().getShortUserName(); writeFile(fs, big, bytesPerFile, partsPerMap); - - // Capture if any runtime exception occurred - AtomicBoolean failed = new AtomicBoolean(); - + // run multiple times for (int i = 0; i < 20; ++i) { Thread getInfoThread = new Thread() { @Override public void run() { try { - cache.getIndexInformation("bigIndex", 0, big, user); + cache.getIndexInformation("bigIndex", partsPerMap, big, user); } catch (Exception e) { // should not be here - failed.set(true); } } }; Thread removeMapThread = new Thread() { @Override public void run() { - try { - cache.removeMap("bigIndex"); - } catch (Exception e) { - // should not be here - failed.set(true); - } + cache.removeMap("bigIndex"); } }; if (i%2==0) { @@ -254,7 +244,6 @@ public void run() { } getInfoThread.join(); removeMapThread.join(); - assertFalse("An unexpected exception", failed.get()); assertTrue(cache.checkTotalMemoryUsed()); } } @@ -272,9 +261,6 @@ public void testCreateRace() throws Exception { UserGroupInformation.getCurrentUser().getShortUserName(); writeFile(fs, racy, bytesPerFile, partsPerMap); - // Capture if any runtime exception occurred - AtomicBoolean failed = new AtomicBoolean(); - // run multiple instances Thread[] getInfoThreads = new Thread[50]; for (int i = 0; i < 50; i++) { @@ -282,15 +268,10 @@ public void testCreateRace() throws Exception { @Override public void run() { try { - while (!Thread.currentThread().isInterrupted()) { - cache.getIndexInformation("racyIndex", 0, racy, user); - cache.removeMap("racyIndex"); - } + cache.getIndexInformation("racyIndex", partsPerMap, racy, user); + cache.removeMap("racyIndex"); } catch (Exception e) { - if (!Thread.currentThread().isInterrupted()) { - // should not be here - failed.set(true); - } + // should not be here } } }; @@ -300,12 +281,20 @@ public void run() { getInfoThreads[i].start(); } - // The duration to keep the threads testing - Thread.sleep(5000); + final Thread mainTestThread = Thread.currentThread(); + + Thread timeoutThread = new Thread() { + @Override + public void run() { + try { + Thread.sleep(15000); + mainTestThread.interrupt(); + } catch (InterruptedException ie) { + // we are done; + } + } + }; - for (int i = 0; i < 50; i++) { - getInfoThreads[i].interrupt(); - } for (int i = 0; i < 50; i++) { try { getInfoThreads[i].join(); @@ -314,9 +303,10 @@ public void run() { fail("Unexpectedly long delay during concurrent cache entry creations"); } } - assertFalse("An unexpected exception", failed.get()); - assertTrue("Total memory used does not represent contents of the cache", - cache.checkTotalMemoryUsed()); + // stop the timeoutThread. If we get interrupted before stopping, there + // must be something wrong, although it wasn't a deadlock. No need to + // catch and swallow. + timeoutThread.interrupt(); } private static void checkRecord(IndexRecord rec, long fill) {