Revert "MAPREDUCE-7277. IndexCache totalMemoryUsed differs from cache contents. Contributed by Jon Eagles (jeagles)."

This reverts commit 741fcf2c63.
This commit is contained in:
Eric Badger 2020-06-08 20:25:02 +00:00
parent fa723aa7f8
commit 890617c7ac
2 changed files with 56 additions and 95 deletions

View File

@ -72,12 +72,11 @@ public IndexRecord getIndexInformation(String mapId, int reduce,
try { try {
info.wait(); info.wait();
} catch (InterruptedException e) { } catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted waiting for construction", e); throw new IOException("Interrupted waiting for construction", e);
} }
} }
} }
LOG.debug("IndexCache HIT: MapId {} found", mapId); LOG.debug("IndexCache HIT: MapId " + mapId + " found");
} }
if (info.mapSpillRecord.size() == 0 || if (info.mapSpillRecord.size() == 0 ||
@ -107,91 +106,63 @@ private IndexInformation readIndexFileToCache(Path indexFileName,
try { try {
info.wait(); info.wait();
} catch (InterruptedException e) { } catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted waiting for construction", e); throw new IOException("Interrupted waiting for construction", e);
} }
} }
} }
LOG.debug("IndexCache HIT: MapId {} found", mapId); LOG.debug("IndexCache HIT: MapId " + mapId + " found");
return info; return info;
} }
LOG.debug("IndexCache MISS: MapId {} not found", mapId); LOG.debug("IndexCache MISS: MapId " + mapId + " not found") ;
SpillRecord tmp = null; SpillRecord tmp = null;
boolean success = false;
try { try {
tmp = new SpillRecord(indexFileName, conf, expectedIndexOwner); tmp = new SpillRecord(indexFileName, conf, expectedIndexOwner);
success = true;
} catch (Throwable e) { } catch (Throwable e) {
tmp = new SpillRecord(0); tmp = new SpillRecord(0);
cache.remove(mapId); cache.remove(mapId);
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
throw new IOException("Error Reading IndexFile", e); throw new IOException("Error Reading IndexFile", e);
} finally { } finally {
synchronized (newInd) { synchronized (newInd) {
newInd.mapSpillRecord = tmp; newInd.mapSpillRecord = tmp;
if (success) {
// Only add mapId to the queue for successful read and after added to
// the cache. Once in the queue, it is now eligible for removal once
// construction is finished.
queue.add(mapId);
if (totalMemoryUsed.addAndGet(newInd.getSize()) > totalMemoryAllowed) {
freeIndexInformation();
}
}
newInd.notifyAll(); newInd.notifyAll();
} }
} }
queue.add(mapId);
if (totalMemoryUsed.addAndGet(newInd.getSize()) > totalMemoryAllowed) {
freeIndexInformation();
}
return newInd; return newInd;
} }
/** /**
* This method removes the map from the cache if it is present in the queue. * This method removes the map from the cache if index information for this
* map is loaded(size>0), index information entry in cache will not be
* removed if it is in the loading phrase(size=0), this prevents corruption
* of totalMemoryUsed. It should be called when a map output on this tracker
* is discarded.
* @param mapId The taskID of this map. * @param mapId The taskID of this map.
*/ */
public void removeMap(String mapId) throws IOException { public void removeMap(String mapId) {
// Successfully removing the mapId from the queue enters into a contract IndexInformation info = cache.get(mapId);
// that this thread will remove the corresponding mapId from the cache. if (info == null || isUnderConstruction(info)) {
return;
}
info = cache.remove(mapId);
if (info != null) {
totalMemoryUsed.addAndGet(-info.getSize());
if (!queue.remove(mapId)) { if (!queue.remove(mapId)) {
LOG.debug("Map ID {} not found in queue", mapId); LOG.warn("Map ID" + mapId + " not found in queue!!");
return;
} }
removeMapInternal(mapId); } else {
} LOG.info("Map ID " + mapId + " not found in cache");
/** This method should only be called upon successful removal of mapId from
* the queue. The mapId will be removed from the cache and totalUsedMemory
* will be decremented.
* @param mapId the cache item to be removed
* @throws IOException
*/
private void removeMapInternal(String mapId) throws IOException {
IndexInformation info = cache.remove(mapId);
if (info == null) {
// Inconsistent state as presence in queue implies presence in cache
LOG.warn("Map ID " + mapId + " not found in cache");
return;
}
try {
synchronized(info) {
while (isUnderConstruction(info)) {
info.wait();
}
totalMemoryUsed.getAndAdd(-info.getSize());
}
} catch (InterruptedException e) {
totalMemoryUsed.getAndAdd(-info.getSize());
Thread.currentThread().interrupt();
throw new IOException("Interrupted waiting for construction", e);
} }
} }
/** /**
* This method checks if cache and totalMemoryUsed is consistent. * This method checks if cache and totolMemoryUsed is consistent.
* It is only used for unit test. * It is only used for unit test.
* @return True if cache and totalMemoryUsed is consistent * @return True if cache and totolMemoryUsed is consistent
*/ */
boolean checkTotalMemoryUsed() { boolean checkTotalMemoryUsed() {
int totalSize = 0; int totalSize = 0;
@ -204,13 +175,13 @@ boolean checkTotalMemoryUsed() {
/** /**
* Bring memory usage below totalMemoryAllowed. * Bring memory usage below totalMemoryAllowed.
*/ */
private synchronized void freeIndexInformation() throws IOException { private synchronized void freeIndexInformation() {
while (totalMemoryUsed.get() > totalMemoryAllowed) { while (totalMemoryUsed.get() > totalMemoryAllowed) {
if(queue.isEmpty()) { String s = queue.remove();
break; IndexInformation info = cache.remove(s);
if (info != null) {
totalMemoryUsed.addAndGet(-info.getSize());
} }
String mapId = queue.remove();
removeMapInternal(mapId);
} }
} }

View File

@ -21,7 +21,6 @@
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.util.Random; import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.zip.CRC32; import java.util.zip.CRC32;
import java.util.zip.CheckedOutputStream; import java.util.zip.CheckedOutputStream;
@ -218,31 +217,22 @@ public void testRemoveMap() throws Exception {
UserGroupInformation.getCurrentUser().getShortUserName(); UserGroupInformation.getCurrentUser().getShortUserName();
writeFile(fs, big, bytesPerFile, partsPerMap); writeFile(fs, big, bytesPerFile, partsPerMap);
// Capture if any runtime exception occurred
AtomicBoolean failed = new AtomicBoolean();
// run multiple times // run multiple times
for (int i = 0; i < 20; ++i) { for (int i = 0; i < 20; ++i) {
Thread getInfoThread = new Thread() { Thread getInfoThread = new Thread() {
@Override @Override
public void run() { public void run() {
try { try {
cache.getIndexInformation("bigIndex", 0, big, user); cache.getIndexInformation("bigIndex", partsPerMap, big, user);
} catch (Exception e) { } catch (Exception e) {
// should not be here // should not be here
failed.set(true);
} }
} }
}; };
Thread removeMapThread = new Thread() { Thread removeMapThread = new Thread() {
@Override @Override
public void run() { public void run() {
try {
cache.removeMap("bigIndex"); cache.removeMap("bigIndex");
} catch (Exception e) {
// should not be here
failed.set(true);
}
} }
}; };
if (i%2==0) { if (i%2==0) {
@ -254,7 +244,6 @@ public void run() {
} }
getInfoThread.join(); getInfoThread.join();
removeMapThread.join(); removeMapThread.join();
assertFalse("An unexpected exception", failed.get());
assertTrue(cache.checkTotalMemoryUsed()); assertTrue(cache.checkTotalMemoryUsed());
} }
} }
@ -272,9 +261,6 @@ public void testCreateRace() throws Exception {
UserGroupInformation.getCurrentUser().getShortUserName(); UserGroupInformation.getCurrentUser().getShortUserName();
writeFile(fs, racy, bytesPerFile, partsPerMap); writeFile(fs, racy, bytesPerFile, partsPerMap);
// Capture if any runtime exception occurred
AtomicBoolean failed = new AtomicBoolean();
// run multiple instances // run multiple instances
Thread[] getInfoThreads = new Thread[50]; Thread[] getInfoThreads = new Thread[50];
for (int i = 0; i < 50; i++) { for (int i = 0; i < 50; i++) {
@ -282,15 +268,10 @@ public void testCreateRace() throws Exception {
@Override @Override
public void run() { public void run() {
try { try {
while (!Thread.currentThread().isInterrupted()) { cache.getIndexInformation("racyIndex", partsPerMap, racy, user);
cache.getIndexInformation("racyIndex", 0, racy, user);
cache.removeMap("racyIndex"); cache.removeMap("racyIndex");
}
} catch (Exception e) { } catch (Exception e) {
if (!Thread.currentThread().isInterrupted()) {
// should not be here // should not be here
failed.set(true);
}
} }
} }
}; };
@ -300,12 +281,20 @@ public void run() {
getInfoThreads[i].start(); getInfoThreads[i].start();
} }
// The duration to keep the threads testing final Thread mainTestThread = Thread.currentThread();
Thread.sleep(5000);
for (int i = 0; i < 50; i++) { Thread timeoutThread = new Thread() {
getInfoThreads[i].interrupt(); @Override
public void run() {
try {
Thread.sleep(15000);
mainTestThread.interrupt();
} catch (InterruptedException ie) {
// we are done;
} }
}
};
for (int i = 0; i < 50; i++) { for (int i = 0; i < 50; i++) {
try { try {
getInfoThreads[i].join(); getInfoThreads[i].join();
@ -314,9 +303,10 @@ public void run() {
fail("Unexpectedly long delay during concurrent cache entry creations"); fail("Unexpectedly long delay during concurrent cache entry creations");
} }
} }
assertFalse("An unexpected exception", failed.get()); // stop the timeoutThread. If we get interrupted before stopping, there
assertTrue("Total memory used does not represent contents of the cache", // must be something wrong, although it wasn't a deadlock. No need to
cache.checkTotalMemoryUsed()); // catch and swallow.
timeoutThread.interrupt();
} }
private static void checkRecord(IndexRecord rec, long fill) { private static void checkRecord(IndexRecord rec, long fill) {