MAPREDUCE-7094. LocalDistributedCacheManager leaves classloaders open, which leaks FDs. Contributed by Adam Szita.

This commit is contained in:
Miklos Szegedi 2018-05-17 10:13:43 -07:00
parent cc3600aabd
commit a2cdffb95a
2 changed files with 36 additions and 9 deletions

View File

@ -73,6 +73,7 @@ class LocalDistributedCacheManager {
private List<String> localClasspaths = new ArrayList<String>(); private List<String> localClasspaths = new ArrayList<String>();
private List<File> symlinksCreated = new ArrayList<File>(); private List<File> symlinksCreated = new ArrayList<File>();
private URLClassLoader classLoaderCreated = null;
private boolean setupCalled = false; private boolean setupCalled = false;
@ -82,7 +83,7 @@ class LocalDistributedCacheManager {
* @param conf * @param conf
* @throws IOException * @throws IOException
*/ */
public void setup(JobConf conf, JobID jobId) throws IOException { public synchronized void setup(JobConf conf, JobID jobId) throws IOException {
File workDir = new File(System.getProperty("user.dir")); File workDir = new File(System.getProperty("user.dir"));
// Generate YARN local resources objects corresponding to the distributed // Generate YARN local resources objects corresponding to the distributed
@ -212,7 +213,7 @@ private void symlink(File workDir, String target, String link)
* Should be called after setup(). * Should be called after setup().
* *
*/ */
public boolean hasLocalClasspaths() { public synchronized boolean hasLocalClasspaths() {
if (!setupCalled) { if (!setupCalled) {
throw new IllegalStateException( throw new IllegalStateException(
"hasLocalClasspaths() should be called after setup()"); "hasLocalClasspaths() should be called after setup()");
@ -224,8 +225,11 @@ public boolean hasLocalClasspaths() {
* Creates a class loader that includes the designated * Creates a class loader that includes the designated
* files and archives. * files and archives.
*/ */
public ClassLoader makeClassLoader(final ClassLoader parent) public synchronized ClassLoader makeClassLoader(final ClassLoader parent)
throws MalformedURLException { throws MalformedURLException {
if (classLoaderCreated != null) {
throw new IllegalStateException("A classloader was already created");
}
final URL[] urls = new URL[localClasspaths.size()]; final URL[] urls = new URL[localClasspaths.size()];
for (int i = 0; i < localClasspaths.size(); ++i) { for (int i = 0; i < localClasspaths.size(); ++i) {
urls[i] = new File(localClasspaths.get(i)).toURI().toURL(); urls[i] = new File(localClasspaths.get(i)).toURI().toURL();
@ -234,12 +238,29 @@ public ClassLoader makeClassLoader(final ClassLoader parent)
return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() { return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() {
@Override @Override
public ClassLoader run() { public ClassLoader run() {
return new URLClassLoader(urls, parent); classLoaderCreated = new URLClassLoader(urls, parent);
return classLoaderCreated;
} }
}); });
} }
public void close() throws IOException { public synchronized void close() throws IOException {
if(classLoaderCreated != null) {
AccessController.doPrivileged(new PrivilegedAction<Void>() {
@Override
public Void run() {
try {
classLoaderCreated.close();
classLoaderCreated = null;
} catch (IOException e) {
LOG.warn("Failed to close classloader created " +
"by LocalDistributedCacheManager");
}
return null;
}
});
}
for (File symlink : symlinksCreated) { for (File symlink : symlinksCreated) {
if (!symlink.delete()) { if (!symlink.delete()) {
LOG.warn("Failed to delete symlink created by the local job runner: " + LOG.warn("Failed to delete symlink created by the local job runner: " +

View File

@ -593,10 +593,16 @@ public void run() {
} finally { } finally {
try { try {
fs.delete(systemJobFile.getParent(), true); // delete submit dir try {
localFs.delete(localJobFile, true); // delete local copy // Cleanup distributed cache
// Cleanup distributed cache localDistributedCacheManager.close();
localDistributedCacheManager.close(); } finally {
try {
fs.delete(systemJobFile.getParent(), true); // delete submit dir
} finally {
localFs.delete(localJobFile, true); // delete local copy
}
}
} catch (IOException e) { } catch (IOException e) {
LOG.warn("Error cleaning up "+id+": "+e); LOG.warn("Error cleaning up "+id+": "+e);
} }