From b72507810aece08e17ab4b5aae1f7eae1fe98609 Mon Sep 17 00:00:00 2001 From: Robert Kanter Date: Wed, 6 May 2015 14:19:06 -0700 Subject: [PATCH] YARN-3491. PublicLocalizer#addResource is too slow. (zxu via rkanter) --- hadoop-yarn-project/CHANGES.txt | 2 + .../nodemanager/DirectoryCollection.java | 33 +++++++++++- .../nodemanager/LocalDirsHandlerService.java | 17 +++++++ .../ResourceLocalizationService.java | 51 +++++++++---------- .../nodemanager/TestDirectoryCollection.java | 47 +++++++++++++++++ .../TestResourceLocalizationService.java | 28 +++++++--- 6 files changed, 142 insertions(+), 36 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 3a8a6a3b4b..dea44821c4 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -180,6 +180,8 @@ Release 2.8.0 - UNRELEASED YARN-3396. Handle URISyntaxException in ResourceLocalizationService. (Brahma Reddy Battula via junping_du) + YARN-3491. PublicLocalizer#addResource is too slow. (zxu via rkanter) + OPTIMIZATIONS YARN-3339. TestDockerContainerExecutor should pull a single image and not diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DirectoryCollection.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DirectoryCollection.java index 26589188c4..32046c5fdf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DirectoryCollection.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DirectoryCollection.java @@ -42,9 +42,12 @@ /** * Manages a list of local storage directories. */ -class DirectoryCollection { +public class DirectoryCollection { private static final Log LOG = LogFactory.getLog(DirectoryCollection.class); + /** + * The enum defines disk failure type. + */ public enum DiskErrorCause { DISK_FULL, OTHER } @@ -59,6 +62,13 @@ static class DiskErrorInformation { } } + /** + * The interface provides a callback when localDirs is changed. + */ + public interface DirsChangeListener { + void onDirsChanged(); + } + /** * Returns a merged list which contains all the elements of l1 and l2 * @param l1 the first list to be included @@ -84,6 +94,8 @@ static List concat(List l1, List l2) { private int goodDirsDiskUtilizationPercentage; + private Set dirsChangeListeners; + /** * Create collection for the directories specified. No check for free space. * @@ -154,6 +166,20 @@ public DirectoryCollection(String[] dirs, : utilizationPercentageCutOff); diskUtilizationSpaceCutoff = utilizationSpaceCutOff < 0 ? 0 : utilizationSpaceCutOff; + + dirsChangeListeners = new HashSet(); + } + + synchronized void registerDirsChangeListener( + DirsChangeListener listener) { + if (dirsChangeListeners.add(listener)) { + listener.onDirsChanged(); + } + } + + synchronized void deregisterDirsChangeListener( + DirsChangeListener listener) { + dirsChangeListeners.remove(listener); } /** @@ -280,6 +306,11 @@ synchronized boolean checkDirs() { } } setGoodDirsDiskUtilizationPercentage(); + if (setChanged) { + for (DirsChangeListener listener : dirsChangeListeners) { + listener.onDirsChanged(); + } + } return setChanged; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java index 493571dc81..57d4395e32 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java @@ -38,6 +38,7 @@ import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.nodemanager.DirectoryCollection.DirsChangeListener; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; /** @@ -192,6 +193,22 @@ protected void serviceStop() throws Exception { super.serviceStop(); } + public void registerLocalDirsChangeListener(DirsChangeListener listener) { + localDirs.registerDirsChangeListener(listener); + } + + public void registerLogDirsChangeListener(DirsChangeListener listener) { + logDirs.registerDirsChangeListener(listener); + } + + public void deregisterLocalDirsChangeListener(DirsChangeListener listener) { + localDirs.deregisterDirsChangeListener(listener); + } + + public void deregisterLogDirsChangeListener(DirsChangeListener listener) { + logDirs.deregisterDirsChangeListener(listener); + } + /** * @return the good/valid local directories based on disks' health */ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index 17ea1a9458..603e79558d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -92,6 +92,7 @@ import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.DeletionService.FileDeletionTask; +import org.apache.hadoop.yarn.server.nodemanager.DirectoryCollection.DirsChangeListener; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol; import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec; @@ -161,6 +162,8 @@ public class ResourceLocalizationService extends CompositeService private LocalResourcesTracker publicRsrc; private LocalDirsHandlerService dirsHandler; + private DirsChangeListener localDirsChangeListener; + private DirsChangeListener logDirsChangeListener; private Context nmContext; /** @@ -254,6 +257,18 @@ public void serviceInit(Configuration conf) throws Exception { localizerTracker = createLocalizerTracker(conf); addService(localizerTracker); dispatcher.register(LocalizerEventType.class, localizerTracker); + localDirsChangeListener = new DirsChangeListener() { + @Override + public void onDirsChanged() { + checkAndInitializeLocalDirs(); + } + }; + logDirsChangeListener = new DirsChangeListener() { + @Override + public void onDirsChanged() { + initializeLogDirs(lfs); + } + }; super.serviceInit(conf); } @@ -345,6 +360,8 @@ public void serviceStart() throws Exception { server.getListenerAddress()); LOG.info("Localizer started on port " + server.getPort()); super.serviceStart(); + dirsHandler.registerLocalDirsChangeListener(localDirsChangeListener); + dirsHandler.registerLogDirsChangeListener(logDirsChangeListener); } LocalizerTracker createLocalizerTracker(Configuration conf) { @@ -375,6 +392,8 @@ Server createServer() { @Override public void serviceStop() throws Exception { + dirsHandler.deregisterLocalDirsChangeListener(localDirsChangeListener); + dirsHandler.deregisterLogDirsChangeListener(logDirsChangeListener); if (server != null) { server.stop(); } @@ -814,11 +833,6 @@ public void addResource(LocalizerResourceRequestEvent request) { DiskChecker.checkDir(new File(publicDirDestPath.toUri().getPath())); } - // In case this is not a newly initialized nm state, ensure - // initialized local/log dirs similar to LocalizerRunner - getInitializedLocalDirs(); - getInitializedLogDirs(); - // explicitly synchronize pending here to avoid future task // completing and being dequeued before pending updated synchronized (pending) { @@ -1120,8 +1134,6 @@ public void run() { // 1) write credentials to private dir writeCredentials(nmPrivateCTokensPath); // 2) exec initApplication and wait - List localDirs = getInitializedLocalDirs(); - List logDirs = getInitializedLogDirs(); if (dirsHandler.areDisksHealthy()) { exec.startLocalizer(nmPrivateCTokensPath, localizationServerAddress, context.getUser(), @@ -1387,13 +1399,12 @@ private void cleanUpFilesPerUserDir(FileContext lfs, DeletionService del, } /** - * Synchronized method to get a list of initialized local dirs. Method will - * check each local dir to ensure it has been setup correctly and will attempt - * to fix any issues it finds. - * - * @return list of initialized local dirs + * Check each local dir to ensure it has been setup correctly and will + * attempt to fix any issues it finds. + * @return void */ - synchronized private List getInitializedLocalDirs() { + @VisibleForTesting + void checkAndInitializeLocalDirs() { List dirs = dirsHandler.getLocalDirs(); List checkFailedDirs = new ArrayList(); for (String dir : dirs) { @@ -1415,7 +1426,6 @@ synchronized private List getInitializedLocalDirs() { throw new YarnRuntimeException(msg, e); } } - return dirs; } private boolean checkLocalDir(String localDir) { @@ -1463,17 +1473,4 @@ private Map getLocalDirsPathPermissionsMap(String localDir) localDirPathFsPermissionsMap.put(sysDir, nmPrivatePermission); return localDirPathFsPermissionsMap; } - - /** - * Synchronized method to get a list of initialized log dirs. Method will - * check each local dir to ensure it has been setup correctly and will attempt - * to fix any issues it finds. - * - * @return list of initialized log dirs - */ - synchronized private List getInitializedLogDirs() { - List dirs = dirsHandler.getLogDirs(); - initializeLogDirs(lfs); - return dirs; - } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDirectoryCollection.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDirectoryCollection.java index e4525a570f..2fd89c601d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDirectoryCollection.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDirectoryCollection.java @@ -31,6 +31,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.nodemanager.DirectoryCollection.DirsChangeListener; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -258,4 +259,50 @@ public void testConstructors() { Assert.assertEquals(100.0F, dc.getDiskUtilizationPercentageCutoff(), delta); Assert.assertEquals(0, dc.getDiskUtilizationSpaceCutoff()); } + + @Test + public void testDirsChangeListener() { + DirsChangeListenerTest listener1 = new DirsChangeListenerTest(); + DirsChangeListenerTest listener2 = new DirsChangeListenerTest(); + DirsChangeListenerTest listener3 = new DirsChangeListenerTest(); + + String dirA = new File(testDir, "dirA").getPath(); + String[] dirs = { dirA }; + DirectoryCollection dc = new DirectoryCollection(dirs, 0.0F); + Assert.assertEquals(1, dc.getGoodDirs().size()); + Assert.assertEquals(listener1.num, 0); + Assert.assertEquals(listener2.num, 0); + Assert.assertEquals(listener3.num, 0); + dc.registerDirsChangeListener(listener1); + dc.registerDirsChangeListener(listener2); + dc.registerDirsChangeListener(listener3); + Assert.assertEquals(listener1.num, 1); + Assert.assertEquals(listener2.num, 1); + Assert.assertEquals(listener3.num, 1); + + dc.deregisterDirsChangeListener(listener3); + dc.checkDirs(); + Assert.assertEquals(0, dc.getGoodDirs().size()); + Assert.assertEquals(listener1.num, 2); + Assert.assertEquals(listener2.num, 2); + Assert.assertEquals(listener3.num, 1); + + dc.deregisterDirsChangeListener(listener2); + dc.setDiskUtilizationPercentageCutoff(100.0F); + dc.checkDirs(); + Assert.assertEquals(1, dc.getGoodDirs().size()); + Assert.assertEquals(listener1.num, 3); + Assert.assertEquals(listener2.num, 2); + Assert.assertEquals(listener3.num, 1); + } + + static class DirsChangeListenerTest implements DirsChangeListener { + public int num = 0; + public DirsChangeListenerTest() { + } + @Override + public void onDirsChanged() { + num++; + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java index 2edaf45877..07001ad835 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java @@ -1098,7 +1098,6 @@ public void testPublicResourceInitializesLocalDir() throws Exception { isA(Configuration.class)); spyService.init(conf); - spyService.start(); final FsPermission defaultPerm = new FsPermission((short)0755); @@ -1110,6 +1109,8 @@ public void testPublicResourceInitializesLocalDir() throws Exception { .mkdir(eq(publicCache),eq(defaultPerm), eq(true)); } + spyService.start(); + final String user = "user0"; // init application final Application app = mock(Application.class); @@ -1131,21 +1132,32 @@ public void testPublicResourceInitializesLocalDir() throws Exception { r.setSeed(seed); // Queue up public resource localization - final LocalResource pubResource = getPublicMockedResource(r); - final LocalResourceRequest pubReq = new LocalResourceRequest(pubResource); + final LocalResource pubResource1 = getPublicMockedResource(r); + final LocalResourceRequest pubReq1 = + new LocalResourceRequest(pubResource1); + + LocalResource pubResource2 = null; + do { + pubResource2 = getPublicMockedResource(r); + } while (pubResource2 == null || pubResource2.equals(pubResource1)); + // above call to make sure we don't get identical resources. + final LocalResourceRequest pubReq2 = + new LocalResourceRequest(pubResource2); + + Set pubRsrcs = new HashSet(); + pubRsrcs.add(pubReq1); + pubRsrcs.add(pubReq2); Map> req = new HashMap>(); - req.put(LocalResourceVisibility.PUBLIC, - Collections.singletonList(pubReq)); - - Set pubRsrcs = new HashSet(); - pubRsrcs.add(pubReq); + req.put(LocalResourceVisibility.PUBLIC, pubRsrcs); spyService.handle(new ContainerLocalizationRequestEvent(c, req)); dispatcher.await(); + verify(spyService, times(1)).checkAndInitializeLocalDirs(); + // verify directory creation for (Path p : localDirs) { p = new Path((new URI(p.toString())).getPath());