YARN-9452. Fix TestDistributedShell and TestTimelineAuthFilterForV2 failures. Contributed by Prabhu Joseph.
This commit is contained in:
parent
baee71551d
commit
30c6dd92e1
@ -226,6 +226,8 @@ public enum DSEntity {
|
||||
@VisibleForTesting
|
||||
UserGroupInformation appSubmitterUgi;
|
||||
|
||||
private Path homeDirectory;
|
||||
|
||||
// Handle to communicate with the Node Manager
|
||||
private NMClientAsync nmClientAsync;
|
||||
// Listen to process the response from the Node Manager
|
||||
@ -513,6 +515,7 @@ public boolean init(String[] args) throws ParseException, IOException {
|
||||
+ "retrieved by"
|
||||
+ " the new application attempt ");
|
||||
opts.addOption("localized_files", true, "List of localized files");
|
||||
opts.addOption("homedir", true, "Home Directory of Job Owner");
|
||||
|
||||
opts.addOption("help", false, "Print usage");
|
||||
CommandLine cliParser = new GnuParser().parse(opts, args);
|
||||
@ -544,6 +547,11 @@ public boolean init(String[] args) throws ParseException, IOException {
|
||||
dumpOutDebugInfo();
|
||||
}
|
||||
|
||||
homeDirectory = cliParser.hasOption("homedir") ?
|
||||
new Path(cliParser.getOptionValue("homedir")) :
|
||||
new Path("/user/" + System.getenv(ApplicationConstants.
|
||||
Environment.USER.name()));
|
||||
|
||||
if (cliParser.hasOption("placement_spec")) {
|
||||
String placementSpec = cliParser.getOptionValue("placement_spec");
|
||||
String decodedSpec = getDecodedPlacementSpec(placementSpec);
|
||||
@ -779,7 +787,7 @@ private void cleanup() {
|
||||
@Override
|
||||
public Void run() throws IOException {
|
||||
FileSystem fs = FileSystem.get(conf);
|
||||
Path dst = new Path(getAppSubmitterHomeDir(),
|
||||
Path dst = new Path(homeDirectory,
|
||||
getRelativePath(appName, appId.toString(), ""));
|
||||
fs.delete(dst, true);
|
||||
return null;
|
||||
@ -790,11 +798,6 @@ public Void run() throws IOException {
|
||||
}
|
||||
}
|
||||
|
||||
private Path getAppSubmitterHomeDir() {
|
||||
return new Path("/user/" +
|
||||
System.getenv(ApplicationConstants.Environment.USER.name()));
|
||||
}
|
||||
|
||||
/**
|
||||
* Main run function for the application master
|
||||
*
|
||||
@ -1495,7 +1498,7 @@ public void run() {
|
||||
String relativePath =
|
||||
getRelativePath(appName, appId.toString(), fileName);
|
||||
Path dst =
|
||||
new Path(getAppSubmitterHomeDir(), relativePath);
|
||||
new Path(homeDirectory, relativePath);
|
||||
FileStatus fileStatus = fs.getFileStatus(dst);
|
||||
LocalResource localRes = LocalResource.newInstance(
|
||||
URL.fromURI(dst.toUri()),
|
||||
|
@ -986,6 +986,8 @@ public boolean run() throws IOException, YarnException {
|
||||
}
|
||||
vargs.add("--appname " + appName);
|
||||
|
||||
vargs.add("--homedir " + fs.getHomeDirectory());
|
||||
|
||||
vargs.addAll(containerRetryOptions);
|
||||
|
||||
vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout");
|
||||
|
@ -273,11 +273,7 @@ private void publishContainerResumedEvent(
|
||||
TimelineEvent tEvent = new TimelineEvent();
|
||||
tEvent.setId(ContainerMetricsConstants.RESUMED_EVENT_TYPE);
|
||||
tEvent.setTimestamp(event.getTimestamp());
|
||||
|
||||
long containerStartTime = container.getContainerStartTime();
|
||||
entity.addEvent(tEvent);
|
||||
entity
|
||||
.setIdPrefix(TimelineServiceHelper.invertLong(containerStartTime));
|
||||
dispatcher.getEventHandler().handle(new TimelinePublishEvent(entity,
|
||||
containerId.getApplicationAttemptId().getApplicationId()));
|
||||
}
|
||||
@ -302,11 +298,7 @@ private void publishContainerPausedEvent(
|
||||
TimelineEvent tEvent = new TimelineEvent();
|
||||
tEvent.setId(ContainerMetricsConstants.PAUSED_EVENT_TYPE);
|
||||
tEvent.setTimestamp(event.getTimestamp());
|
||||
|
||||
long containerStartTime = container.getContainerStartTime();
|
||||
entity.addEvent(tEvent);
|
||||
entity
|
||||
.setIdPrefix(TimelineServiceHelper.invertLong(containerStartTime));
|
||||
dispatcher.getEventHandler().handle(new TimelinePublishEvent(entity,
|
||||
containerId.getApplicationAttemptId().getApplicationId()));
|
||||
}
|
||||
@ -333,11 +325,7 @@ private void publishContainerKilledEvent(
|
||||
TimelineEvent tEvent = new TimelineEvent();
|
||||
tEvent.setId(ContainerMetricsConstants.KILLED_EVENT_TYPE);
|
||||
tEvent.setTimestamp(event.getTimestamp());
|
||||
|
||||
long containerStartTime = container.getContainerStartTime();
|
||||
entity.addEvent(tEvent);
|
||||
entity
|
||||
.setIdPrefix(TimelineServiceHelper.invertLong(containerStartTime));
|
||||
dispatcher.getEventHandler().handle(new TimelinePublishEvent(entity,
|
||||
containerId.getApplicationAttemptId().getApplicationId()));
|
||||
}
|
||||
|
@ -34,14 +34,15 @@
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.File;
|
||||
import java.io.FileFilter;
|
||||
import java.io.FileReader;
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.Callable;
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
@ -100,6 +101,8 @@ public class TestTimelineAuthFilterForV2 {
|
||||
getKeytabFile());
|
||||
private static String httpSpnegoPrincipal = KerberosTestUtils.
|
||||
getServerPrincipal();
|
||||
private static final String ENTITY_TYPE = "dummy_type";
|
||||
private static final AtomicInteger ENTITY_TYPE_SUFFIX = new AtomicInteger(0);
|
||||
|
||||
// First param indicates whether HTTPS access or HTTP access and second param
|
||||
// indicates whether it is kerberos access or token based access.
|
||||
@ -274,11 +277,20 @@ private static TimelineEntity createEntity(String id, String type) {
|
||||
}
|
||||
|
||||
private static void verifyEntity(File entityTypeDir, String id, String type)
|
||||
throws IOException {
|
||||
throws InterruptedException, IOException {
|
||||
File entityFile = new File(entityTypeDir, id +
|
||||
FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION);
|
||||
TimelineEntity entity = null;
|
||||
for (int i = 0; i < 50; i++) {
|
||||
if (entityFile.exists()) {
|
||||
entity = readEntityFile(entityFile);
|
||||
if (entity != null) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
Thread.sleep(50);
|
||||
}
|
||||
assertTrue(entityFile.exists());
|
||||
TimelineEntity entity = readEntityFile(entityFile);
|
||||
assertNotNull(entity);
|
||||
assertEquals(id, entity.getId());
|
||||
assertEquals(type, entity.getType());
|
||||
@ -333,7 +345,8 @@ private boolean publishWithRetries(ApplicationId appId, File entityTypeDir,
|
||||
|
||||
@Test
|
||||
public void testPutTimelineEntities() throws Exception {
|
||||
final String entityType = "dummy_type";
|
||||
final String entityType = ENTITY_TYPE +
|
||||
ENTITY_TYPE_SUFFIX.getAndIncrement();
|
||||
ApplicationId appId = ApplicationId.newInstance(0, 1);
|
||||
File entityTypeDir = new File(TEST_ROOT_DIR.getAbsolutePath() +
|
||||
File.separator + "entities" + File.separator +
|
||||
@ -342,92 +355,92 @@ public void testPutTimelineEntities() throws Exception {
|
||||
File.separator + "test_flow_name" + File.separator +
|
||||
"test_flow_version" + File.separator + "1" + File.separator +
|
||||
appId.toString() + File.separator + entityType);
|
||||
try {
|
||||
if (withKerberosLogin) {
|
||||
KerberosTestUtils.doAs(HTTP_USER + "/localhost", new Callable<Void>() {
|
||||
@Override
|
||||
public Void call() throws Exception {
|
||||
publishAndVerifyEntity(appId, entityTypeDir, entityType, 1);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
} else {
|
||||
assertTrue("Entities should have been published successfully.",
|
||||
publishWithRetries(appId, entityTypeDir, entityType, 1));
|
||||
|
||||
AppLevelTimelineCollector collector =
|
||||
(AppLevelTimelineCollector) collectorManager.get(appId);
|
||||
Token<TimelineDelegationTokenIdentifier> token =
|
||||
collector.getDelegationTokenForApp();
|
||||
assertNotNull(token);
|
||||
|
||||
// Verify if token is renewed automatically and entities can still be
|
||||
// published.
|
||||
Thread.sleep(1000);
|
||||
// Entities should publish successfully after renewal.
|
||||
assertTrue("Entities should have been published successfully.",
|
||||
publishWithRetries(appId, entityTypeDir, entityType, 2));
|
||||
assertNotNull(collector);
|
||||
verify(collectorManager.getTokenManagerService(), atLeastOnce()).
|
||||
renewToken(eq(collector.getDelegationTokenForApp()),
|
||||
any(String.class));
|
||||
|
||||
// Wait to ensure lifetime of token expires and ensure its regenerated
|
||||
// automatically.
|
||||
Thread.sleep(3000);
|
||||
for (int i = 0; i < 40; i++) {
|
||||
if (!token.equals(collector.getDelegationTokenForApp())) {
|
||||
break;
|
||||
}
|
||||
Thread.sleep(50);
|
||||
}
|
||||
assertNotEquals("Token should have been regenerated.", token,
|
||||
collector.getDelegationTokenForApp());
|
||||
Thread.sleep(1000);
|
||||
// Try publishing with the old token in UGI. Publishing should fail due
|
||||
// to invalid token.
|
||||
try {
|
||||
publishAndVerifyEntity(appId, entityTypeDir, entityType, 2);
|
||||
fail("Exception should have been thrown due to Invalid Token.");
|
||||
} catch (YarnException e) {
|
||||
assertTrue("Exception thrown should have been due to Invalid Token.",
|
||||
e.getCause().getMessage().contains("InvalidToken"));
|
||||
if (withKerberosLogin) {
|
||||
KerberosTestUtils.doAs(HTTP_USER + "/localhost", new Callable<Void>() {
|
||||
@Override
|
||||
public Void call() throws Exception {
|
||||
publishAndVerifyEntity(appId, entityTypeDir, entityType, 1);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
} else {
|
||||
assertTrue("Entities should have been published successfully.",
|
||||
publishWithRetries(appId, entityTypeDir, entityType, 1));
|
||||
|
||||
// Update the regenerated token in UGI and retry publishing entities.
|
||||
Token<TimelineDelegationTokenIdentifier> regeneratedToken =
|
||||
collector.getDelegationTokenForApp();
|
||||
regeneratedToken.setService(new Text("localhost" +
|
||||
regeneratedToken.getService().toString().substring(
|
||||
regeneratedToken.getService().toString().indexOf(":"))));
|
||||
UserGroupInformation.getCurrentUser().addToken(regeneratedToken);
|
||||
assertTrue("Entities should have been published successfully.",
|
||||
publishWithRetries(appId, entityTypeDir, entityType, 2));
|
||||
// Token was generated twice, once when app collector was created and
|
||||
// later after token lifetime expiry.
|
||||
verify(collectorManager.getTokenManagerService(), times(2)).
|
||||
generateToken(any(UserGroupInformation.class), any(String.class));
|
||||
assertEquals(1, ((DummyNodeTimelineCollectorManager) collectorManager).
|
||||
getTokenExpiredCnt());
|
||||
}
|
||||
// Wait for async entity to be published.
|
||||
for (int i = 0; i < 50; i++) {
|
||||
if (entityTypeDir.listFiles().length == 2) {
|
||||
AppLevelTimelineCollector collector =
|
||||
(AppLevelTimelineCollector) collectorManager.get(appId);
|
||||
Token<TimelineDelegationTokenIdentifier> token =
|
||||
collector.getDelegationTokenForApp();
|
||||
assertNotNull(token);
|
||||
|
||||
// Verify if token is renewed automatically and entities can still be
|
||||
// published.
|
||||
Thread.sleep(1000);
|
||||
// Entities should publish successfully after renewal.
|
||||
assertTrue("Entities should have been published successfully.",
|
||||
publishWithRetries(appId, entityTypeDir, entityType, 2));
|
||||
assertNotNull(collector);
|
||||
verify(collectorManager.getTokenManagerService(), atLeastOnce()).
|
||||
renewToken(eq(collector.getDelegationTokenForApp()),
|
||||
any(String.class));
|
||||
|
||||
// Wait to ensure lifetime of token expires and ensure its regenerated
|
||||
// automatically.
|
||||
Thread.sleep(3000);
|
||||
for (int i = 0; i < 40; i++) {
|
||||
if (!token.equals(collector.getDelegationTokenForApp())) {
|
||||
break;
|
||||
}
|
||||
Thread.sleep(50);
|
||||
}
|
||||
assertEquals(2, entityTypeDir.listFiles().length);
|
||||
verifyEntity(entityTypeDir, "entity2", entityType);
|
||||
AppLevelTimelineCollector collector =
|
||||
(AppLevelTimelineCollector)collectorManager.get(appId);
|
||||
assertNotNull(collector);
|
||||
auxService.removeApplication(appId);
|
||||
verify(collectorManager.getTokenManagerService()).cancelToken(
|
||||
eq(collector.getDelegationTokenForApp()), any(String.class));
|
||||
} finally {
|
||||
FileUtils.deleteQuietly(entityTypeDir);
|
||||
assertNotEquals("Token should have been regenerated.", token,
|
||||
collector.getDelegationTokenForApp());
|
||||
Thread.sleep(1000);
|
||||
// Try publishing with the old token in UGI. Publishing should fail due
|
||||
// to invalid token.
|
||||
try {
|
||||
publishAndVerifyEntity(appId, entityTypeDir, entityType, 2);
|
||||
fail("Exception should have been thrown due to Invalid Token.");
|
||||
} catch (YarnException e) {
|
||||
assertTrue("Exception thrown should have been due to Invalid Token.",
|
||||
e.getCause().getMessage().contains("InvalidToken"));
|
||||
}
|
||||
|
||||
// Update the regenerated token in UGI and retry publishing entities.
|
||||
Token<TimelineDelegationTokenIdentifier> regeneratedToken =
|
||||
collector.getDelegationTokenForApp();
|
||||
regeneratedToken.setService(new Text("localhost" +
|
||||
regeneratedToken.getService().toString().substring(
|
||||
regeneratedToken.getService().toString().indexOf(":"))));
|
||||
UserGroupInformation.getCurrentUser().addToken(regeneratedToken);
|
||||
assertTrue("Entities should have been published successfully.",
|
||||
publishWithRetries(appId, entityTypeDir, entityType, 2));
|
||||
// Token was generated twice, once when app collector was created and
|
||||
// later after token lifetime expiry.
|
||||
verify(collectorManager.getTokenManagerService(), times(2)).
|
||||
generateToken(any(UserGroupInformation.class), any(String.class));
|
||||
assertEquals(1, ((DummyNodeTimelineCollectorManager) collectorManager).
|
||||
getTokenExpiredCnt());
|
||||
}
|
||||
// Wait for async entity to be published.
|
||||
FileFilter tmpFilter = (pathname -> !pathname.getName().endsWith(".tmp"));
|
||||
File[] entities = null;
|
||||
for (int i = 0; i < 50; i++) {
|
||||
entities = entityTypeDir.listFiles(tmpFilter);
|
||||
if (entities != null && entities.length == 2) {
|
||||
break;
|
||||
}
|
||||
Thread.sleep(50);
|
||||
}
|
||||
assertNotNull("Error reading entityTypeDir", entities);
|
||||
assertEquals(2, entities.length);
|
||||
verifyEntity(entityTypeDir, "entity2", entityType);
|
||||
AppLevelTimelineCollector collector =
|
||||
(AppLevelTimelineCollector)collectorManager.get(appId);
|
||||
assertNotNull(collector);
|
||||
auxService.removeApplication(appId);
|
||||
verify(collectorManager.getTokenManagerService()).cancelToken(
|
||||
eq(collector.getDelegationTokenForApp()), any(String.class));
|
||||
}
|
||||
|
||||
private static class DummyNodeTimelineCollectorManager extends
|
||||
|
Loading…
Reference in New Issue
Block a user