diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index c30dc4dc01..bb300db26d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -226,6 +226,8 @@ public enum DSEntity { @VisibleForTesting UserGroupInformation appSubmitterUgi; + private Path homeDirectory; + // Handle to communicate with the Node Manager private NMClientAsync nmClientAsync; // Listen to process the response from the Node Manager @@ -513,6 +515,7 @@ public boolean init(String[] args) throws ParseException, IOException { + "retrieved by" + " the new application attempt "); opts.addOption("localized_files", true, "List of localized files"); + opts.addOption("homedir", true, "Home Directory of Job Owner"); opts.addOption("help", false, "Print usage"); CommandLine cliParser = new GnuParser().parse(opts, args); @@ -544,6 +547,11 @@ public boolean init(String[] args) throws ParseException, IOException { dumpOutDebugInfo(); } + homeDirectory = cliParser.hasOption("homedir") ? + new Path(cliParser.getOptionValue("homedir")) : + new Path("/user/" + System.getenv(ApplicationConstants. + Environment.USER.name())); + if (cliParser.hasOption("placement_spec")) { String placementSpec = cliParser.getOptionValue("placement_spec"); String decodedSpec = getDecodedPlacementSpec(placementSpec); @@ -779,7 +787,7 @@ private void cleanup() { @Override public Void run() throws IOException { FileSystem fs = FileSystem.get(conf); - Path dst = new Path(getAppSubmitterHomeDir(), + Path dst = new Path(homeDirectory, getRelativePath(appName, appId.toString(), "")); fs.delete(dst, true); return null; @@ -790,11 +798,6 @@ public Void run() throws IOException { } } - private Path getAppSubmitterHomeDir() { - return new Path("/user/" + - System.getenv(ApplicationConstants.Environment.USER.name())); - } - /** * Main run function for the application master * @@ -1495,7 +1498,7 @@ public void run() { String relativePath = getRelativePath(appName, appId.toString(), fileName); Path dst = - new Path(getAppSubmitterHomeDir(), relativePath); + new Path(homeDirectory, relativePath); FileStatus fileStatus = fs.getFileStatus(dst); LocalResource localRes = LocalResource.newInstance( URL.fromURI(dst.toUri()), diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java index 08c6b83797..4bd57dd27f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java @@ -986,6 +986,8 @@ public boolean run() throws IOException, YarnException { } vargs.add("--appname " + appName); + vargs.add("--homedir " + fs.getHomeDirectory()); + vargs.addAll(containerRetryOptions); vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java index ba574952f7..5a4de1f4b6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java @@ -273,11 +273,7 @@ private void publishContainerResumedEvent( TimelineEvent tEvent = new TimelineEvent(); tEvent.setId(ContainerMetricsConstants.RESUMED_EVENT_TYPE); tEvent.setTimestamp(event.getTimestamp()); - - long containerStartTime = container.getContainerStartTime(); entity.addEvent(tEvent); - entity - .setIdPrefix(TimelineServiceHelper.invertLong(containerStartTime)); dispatcher.getEventHandler().handle(new TimelinePublishEvent(entity, containerId.getApplicationAttemptId().getApplicationId())); } @@ -302,11 +298,7 @@ private void publishContainerPausedEvent( TimelineEvent tEvent = new TimelineEvent(); tEvent.setId(ContainerMetricsConstants.PAUSED_EVENT_TYPE); tEvent.setTimestamp(event.getTimestamp()); - - long containerStartTime = container.getContainerStartTime(); entity.addEvent(tEvent); - entity - .setIdPrefix(TimelineServiceHelper.invertLong(containerStartTime)); dispatcher.getEventHandler().handle(new TimelinePublishEvent(entity, containerId.getApplicationAttemptId().getApplicationId())); } @@ -333,11 +325,7 @@ private void publishContainerKilledEvent( TimelineEvent tEvent = new TimelineEvent(); tEvent.setId(ContainerMetricsConstants.KILLED_EVENT_TYPE); tEvent.setTimestamp(event.getTimestamp()); - - long containerStartTime = container.getContainerStartTime(); entity.addEvent(tEvent); - entity - .setIdPrefix(TimelineServiceHelper.invertLong(containerStartTime)); dispatcher.getEventHandler().handle(new TimelinePublishEvent(entity, containerId.getApplicationAttemptId().getApplicationId())); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/security/TestTimelineAuthFilterForV2.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/security/TestTimelineAuthFilterForV2.java index 95a008a875..0c70a5afda 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/security/TestTimelineAuthFilterForV2.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/security/TestTimelineAuthFilterForV2.java @@ -34,14 +34,15 @@ import java.io.BufferedReader; import java.io.File; +import java.io.FileFilter; import java.io.FileReader; import java.io.IOException; import java.util.Arrays; import java.util.Collection; import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.Callable; -import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FileSystem; @@ -100,6 +101,8 @@ public class TestTimelineAuthFilterForV2 { getKeytabFile()); private static String httpSpnegoPrincipal = KerberosTestUtils. getServerPrincipal(); + private static final String ENTITY_TYPE = "dummy_type"; + private static final AtomicInteger ENTITY_TYPE_SUFFIX = new AtomicInteger(0); // First param indicates whether HTTPS access or HTTP access and second param // indicates whether it is kerberos access or token based access. @@ -274,11 +277,20 @@ private static TimelineEntity createEntity(String id, String type) { } private static void verifyEntity(File entityTypeDir, String id, String type) - throws IOException { + throws InterruptedException, IOException { File entityFile = new File(entityTypeDir, id + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION); + TimelineEntity entity = null; + for (int i = 0; i < 50; i++) { + if (entityFile.exists()) { + entity = readEntityFile(entityFile); + if (entity != null) { + break; + } + } + Thread.sleep(50); + } assertTrue(entityFile.exists()); - TimelineEntity entity = readEntityFile(entityFile); assertNotNull(entity); assertEquals(id, entity.getId()); assertEquals(type, entity.getType()); @@ -333,7 +345,8 @@ private boolean publishWithRetries(ApplicationId appId, File entityTypeDir, @Test public void testPutTimelineEntities() throws Exception { - final String entityType = "dummy_type"; + final String entityType = ENTITY_TYPE + + ENTITY_TYPE_SUFFIX.getAndIncrement(); ApplicationId appId = ApplicationId.newInstance(0, 1); File entityTypeDir = new File(TEST_ROOT_DIR.getAbsolutePath() + File.separator + "entities" + File.separator + @@ -342,92 +355,92 @@ public void testPutTimelineEntities() throws Exception { File.separator + "test_flow_name" + File.separator + "test_flow_version" + File.separator + "1" + File.separator + appId.toString() + File.separator + entityType); - try { - if (withKerberosLogin) { - KerberosTestUtils.doAs(HTTP_USER + "/localhost", new Callable() { - @Override - public Void call() throws Exception { - publishAndVerifyEntity(appId, entityTypeDir, entityType, 1); - return null; - } - }); - } else { - assertTrue("Entities should have been published successfully.", - publishWithRetries(appId, entityTypeDir, entityType, 1)); - - AppLevelTimelineCollector collector = - (AppLevelTimelineCollector) collectorManager.get(appId); - Token token = - collector.getDelegationTokenForApp(); - assertNotNull(token); - - // Verify if token is renewed automatically and entities can still be - // published. - Thread.sleep(1000); - // Entities should publish successfully after renewal. - assertTrue("Entities should have been published successfully.", - publishWithRetries(appId, entityTypeDir, entityType, 2)); - assertNotNull(collector); - verify(collectorManager.getTokenManagerService(), atLeastOnce()). - renewToken(eq(collector.getDelegationTokenForApp()), - any(String.class)); - - // Wait to ensure lifetime of token expires and ensure its regenerated - // automatically. - Thread.sleep(3000); - for (int i = 0; i < 40; i++) { - if (!token.equals(collector.getDelegationTokenForApp())) { - break; - } - Thread.sleep(50); - } - assertNotEquals("Token should have been regenerated.", token, - collector.getDelegationTokenForApp()); - Thread.sleep(1000); - // Try publishing with the old token in UGI. Publishing should fail due - // to invalid token. - try { - publishAndVerifyEntity(appId, entityTypeDir, entityType, 2); - fail("Exception should have been thrown due to Invalid Token."); - } catch (YarnException e) { - assertTrue("Exception thrown should have been due to Invalid Token.", - e.getCause().getMessage().contains("InvalidToken")); + if (withKerberosLogin) { + KerberosTestUtils.doAs(HTTP_USER + "/localhost", new Callable() { + @Override + public Void call() throws Exception { + publishAndVerifyEntity(appId, entityTypeDir, entityType, 1); + return null; } + }); + } else { + assertTrue("Entities should have been published successfully.", + publishWithRetries(appId, entityTypeDir, entityType, 1)); - // Update the regenerated token in UGI and retry publishing entities. - Token regeneratedToken = - collector.getDelegationTokenForApp(); - regeneratedToken.setService(new Text("localhost" + - regeneratedToken.getService().toString().substring( - regeneratedToken.getService().toString().indexOf(":")))); - UserGroupInformation.getCurrentUser().addToken(regeneratedToken); - assertTrue("Entities should have been published successfully.", - publishWithRetries(appId, entityTypeDir, entityType, 2)); - // Token was generated twice, once when app collector was created and - // later after token lifetime expiry. - verify(collectorManager.getTokenManagerService(), times(2)). - generateToken(any(UserGroupInformation.class), any(String.class)); - assertEquals(1, ((DummyNodeTimelineCollectorManager) collectorManager). - getTokenExpiredCnt()); - } - // Wait for async entity to be published. - for (int i = 0; i < 50; i++) { - if (entityTypeDir.listFiles().length == 2) { + AppLevelTimelineCollector collector = + (AppLevelTimelineCollector) collectorManager.get(appId); + Token token = + collector.getDelegationTokenForApp(); + assertNotNull(token); + + // Verify if token is renewed automatically and entities can still be + // published. + Thread.sleep(1000); + // Entities should publish successfully after renewal. + assertTrue("Entities should have been published successfully.", + publishWithRetries(appId, entityTypeDir, entityType, 2)); + assertNotNull(collector); + verify(collectorManager.getTokenManagerService(), atLeastOnce()). + renewToken(eq(collector.getDelegationTokenForApp()), + any(String.class)); + + // Wait to ensure lifetime of token expires and ensure its regenerated + // automatically. + Thread.sleep(3000); + for (int i = 0; i < 40; i++) { + if (!token.equals(collector.getDelegationTokenForApp())) { break; } Thread.sleep(50); } - assertEquals(2, entityTypeDir.listFiles().length); - verifyEntity(entityTypeDir, "entity2", entityType); - AppLevelTimelineCollector collector = - (AppLevelTimelineCollector)collectorManager.get(appId); - assertNotNull(collector); - auxService.removeApplication(appId); - verify(collectorManager.getTokenManagerService()).cancelToken( - eq(collector.getDelegationTokenForApp()), any(String.class)); - } finally { - FileUtils.deleteQuietly(entityTypeDir); + assertNotEquals("Token should have been regenerated.", token, + collector.getDelegationTokenForApp()); + Thread.sleep(1000); + // Try publishing with the old token in UGI. Publishing should fail due + // to invalid token. + try { + publishAndVerifyEntity(appId, entityTypeDir, entityType, 2); + fail("Exception should have been thrown due to Invalid Token."); + } catch (YarnException e) { + assertTrue("Exception thrown should have been due to Invalid Token.", + e.getCause().getMessage().contains("InvalidToken")); + } + + // Update the regenerated token in UGI and retry publishing entities. + Token regeneratedToken = + collector.getDelegationTokenForApp(); + regeneratedToken.setService(new Text("localhost" + + regeneratedToken.getService().toString().substring( + regeneratedToken.getService().toString().indexOf(":")))); + UserGroupInformation.getCurrentUser().addToken(regeneratedToken); + assertTrue("Entities should have been published successfully.", + publishWithRetries(appId, entityTypeDir, entityType, 2)); + // Token was generated twice, once when app collector was created and + // later after token lifetime expiry. + verify(collectorManager.getTokenManagerService(), times(2)). + generateToken(any(UserGroupInformation.class), any(String.class)); + assertEquals(1, ((DummyNodeTimelineCollectorManager) collectorManager). + getTokenExpiredCnt()); } + // Wait for async entity to be published. + FileFilter tmpFilter = (pathname -> !pathname.getName().endsWith(".tmp")); + File[] entities = null; + for (int i = 0; i < 50; i++) { + entities = entityTypeDir.listFiles(tmpFilter); + if (entities != null && entities.length == 2) { + break; + } + Thread.sleep(50); + } + assertNotNull("Error reading entityTypeDir", entities); + assertEquals(2, entities.length); + verifyEntity(entityTypeDir, "entity2", entityType); + AppLevelTimelineCollector collector = + (AppLevelTimelineCollector)collectorManager.get(appId); + assertNotNull(collector); + auxService.removeApplication(appId); + verify(collectorManager.getTokenManagerService()).cancelToken( + eq(collector.getDelegationTokenForApp()), any(String.class)); } private static class DummyNodeTimelineCollectorManager extends