From db73cc9124bb8511c1626ba40d3fad81e980e44f Mon Sep 17 00:00:00 2001 From: Karthik Kambatla Date: Mon, 8 Dec 2014 22:18:32 -0800 Subject: [PATCH] YARN-2931. PublicLocalizer may fail until directory is initialized by LocalizeRunner. (Anubhav Dhoot via kasha) --- hadoop-yarn-project/CHANGES.txt | 3 + .../ResourceLocalizationService.java | 6 + .../TestResourceLocalizationService.java | 110 +++++++++++++++++- 3 files changed, 113 insertions(+), 6 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 43b19ec515..d06c8312c5 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -200,6 +200,9 @@ Release 2.7.0 - UNRELEASED YARN-2927. [YARN-1492] InMemorySCMStore properties are inconsistent. (Ray Chiang via kasha) + YARN-2931. PublicLocalizer may fail until directory is initialized by + LocalizeRunner. (Anubhav Dhoot via kasha) + Release 2.6.0 - 2014-11-18 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index f4b6221a7b..5440980590 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -775,6 +775,12 @@ public class ResourceLocalizationService extends CompositeService if (!publicDirDestPath.getParent().equals(publicRootPath)) { DiskChecker.checkDir(new File(publicDirDestPath.toUri().getPath())); } + + // In case this is not a newly initialized nm state, ensure + // initialized local/log dirs similar to LocalizerRunner + getInitializedLocalDirs(); + getInitializedLogDirs(); + // explicitly synchronize pending here to avoid future task // completing and being dequeued before pending updated synchronized (pending) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java index 1051e7acfc..f968bb9f39 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java @@ -32,18 +32,16 @@ import static org.mockito.Matchers.eq; import static org.mockito.Matchers.isA; import static org.mockito.Matchers.isNull; import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import java.io.File; -import java.io.FileNotFoundException; import java.io.IOException; import java.net.InetSocketAddress; import java.net.URI; @@ -63,10 +61,7 @@ import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.Future; -import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.Options; -import org.apache.hadoop.fs.UnresolvedLinkException; -import org.apache.hadoop.security.AccessControlException; import org.junit.Assert; import org.apache.commons.io.FileUtils; @@ -940,6 +935,109 @@ public class TestResourceLocalizationService { } } + @Test + @SuppressWarnings("unchecked") + public void testPublicResourceInitializesLocalDir() throws Exception { + + // Setup state to simulate restart NM with existing state meaning no + // directory creation during initialization + NMStateStoreService spyStateStore = spy(nmContext.getNMStateStore()); + when(spyStateStore.canRecover()).thenReturn(true); + NMContext spyContext = spy(nmContext); + when(spyContext.getNMStateStore()).thenReturn(spyStateStore); + + List localDirs = new ArrayList(); + String[] sDirs = new String[4]; + for (int i = 0; i < 4; ++i) { + localDirs.add(lfs.makeQualified(new Path(basedir, i + ""))); + sDirs[i] = localDirs.get(i).toString(); + } + conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs); + + + DrainDispatcher dispatcher = new DrainDispatcher(); + EventHandler applicationBus = mock(EventHandler.class); + dispatcher.register(ApplicationEventType.class, applicationBus); + EventHandler containerBus = mock(EventHandler.class); + dispatcher.register(ContainerEventType.class, containerBus); + + ContainerExecutor exec = mock(ContainerExecutor.class); + DeletionService delService = mock(DeletionService.class); + LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService(); + dirsHandler.init(conf); + + dispatcher.init(conf); + dispatcher.start(); + + try { + ResourceLocalizationService rawService = + new ResourceLocalizationService(dispatcher, exec, delService, + dirsHandler, spyContext); + ResourceLocalizationService spyService = spy(rawService); + doReturn(mockServer).when(spyService).createServer(); + doReturn(lfs).when(spyService).getLocalFileContext( + isA(Configuration.class)); + + spyService.init(conf); + spyService.start(); + + final FsPermission defaultPerm = new FsPermission((short)0755); + + // verify directory is not created at initialization + for (Path p : localDirs) { + p = new Path((new URI(p.toString())).getPath()); + Path publicCache = new Path(p, ContainerLocalizer.FILECACHE); + verify(spylfs, never()) + .mkdir(eq(publicCache),eq(defaultPerm), eq(true)); + } + + final String user = "user0"; + // init application + final Application app = mock(Application.class); + final ApplicationId appId = + BuilderUtils.newApplicationId(314159265358979L, 3); + when(app.getUser()).thenReturn(user); + when(app.getAppId()).thenReturn(appId); + spyService.handle(new ApplicationLocalizationEvent( + LocalizationEventType.INIT_APPLICATION_RESOURCES, app)); + dispatcher.await(); + + // init container. + final Container c = getMockContainer(appId, 42, user); + + // init resources + Random r = new Random(); + long seed = r.nextLong(); + System.out.println("SEED: " + seed); + r.setSeed(seed); + + // Queue up public resource localization + final LocalResource pubResource = getPublicMockedResource(r); + final LocalResourceRequest pubReq = new LocalResourceRequest(pubResource); + + Map> req = + new HashMap>(); + req.put(LocalResourceVisibility.PUBLIC, + Collections.singletonList(pubReq)); + + Set pubRsrcs = new HashSet(); + pubRsrcs.add(pubReq); + + spyService.handle(new ContainerLocalizationRequestEvent(c, req)); + dispatcher.await(); + + // verify directory creation + for (Path p : localDirs) { + p = new Path((new URI(p.toString())).getPath()); + Path publicCache = new Path(p, ContainerLocalizer.FILECACHE); + verify(spylfs).mkdir(eq(publicCache),eq(defaultPerm), eq(true)); + } + } finally { + dispatcher.stop(); + } + } + @Test(timeout=20000) @SuppressWarnings("unchecked") // mocked generics public void testFailedPublicResource() throws Exception {