YARN-10664. Allow parameter expansion in NM_ADMIN_USER_ENV. Contributed by Jim
Brennan.
This commit is contained in:
parent
e82e7c597a
commit
9d088639bf
@ -179,15 +179,36 @@ public static String expandEnvironment(String var,
|
|||||||
return var;
|
return var;
|
||||||
}
|
}
|
||||||
|
|
||||||
private Map<String, String> expandAllEnvironmentVars(
|
private void expandAllEnvironmentVars(
|
||||||
ContainerLaunchContext launchContext, Path containerLogDir) {
|
Map<String, String> environment, Path containerLogDir) {
|
||||||
Map<String, String> environment = launchContext.getEnvironment();
|
|
||||||
for (Entry<String, String> entry : environment.entrySet()) {
|
for (Entry<String, String> entry : environment.entrySet()) {
|
||||||
String value = entry.getValue();
|
String value = entry.getValue();
|
||||||
value = expandEnvironment(value, containerLogDir);
|
value = expandEnvironment(value, containerLogDir);
|
||||||
entry.setValue(value);
|
entry.setValue(value);
|
||||||
}
|
}
|
||||||
return environment;
|
}
|
||||||
|
|
||||||
|
private void addKeystoreVars(Map<String, String> environment,
|
||||||
|
Path containerWorkDir) {
|
||||||
|
environment.put(ApplicationConstants.KEYSTORE_FILE_LOCATION_ENV_NAME,
|
||||||
|
new Path(containerWorkDir,
|
||||||
|
ContainerLaunch.KEYSTORE_FILE).toUri().getPath());
|
||||||
|
environment.put(ApplicationConstants.KEYSTORE_PASSWORD_ENV_NAME,
|
||||||
|
new String(container.getCredentials().getSecretKey(
|
||||||
|
AMSecretKeys.YARN_APPLICATION_AM_KEYSTORE_PASSWORD),
|
||||||
|
StandardCharsets.UTF_8));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void addTruststoreVars(Map<String, String> environment,
|
||||||
|
Path containerWorkDir) {
|
||||||
|
environment.put(
|
||||||
|
ApplicationConstants.TRUSTSTORE_FILE_LOCATION_ENV_NAME,
|
||||||
|
new Path(containerWorkDir,
|
||||||
|
ContainerLaunch.TRUSTSTORE_FILE).toUri().getPath());
|
||||||
|
environment.put(ApplicationConstants.TRUSTSTORE_PASSWORD_ENV_NAME,
|
||||||
|
new String(container.getCredentials().getSecretKey(
|
||||||
|
AMSecretKeys.YARN_APPLICATION_AM_TRUSTSTORE_PASSWORD),
|
||||||
|
StandardCharsets.UTF_8));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -222,8 +243,10 @@ public Integer call() {
|
|||||||
}
|
}
|
||||||
launchContext.setCommands(newCmds);
|
launchContext.setCommands(newCmds);
|
||||||
|
|
||||||
Map<String, String> environment = expandAllEnvironmentVars(
|
// The actual expansion of environment variables happens after calling
|
||||||
launchContext, containerLogDir);
|
// sanitizeEnv. This allows variables specified in NM_ADMIN_USER_ENV
|
||||||
|
// to reference user or container-defined variables.
|
||||||
|
Map<String, String> environment = launchContext.getEnvironment();
|
||||||
// /////////////////////////// End of variable expansion
|
// /////////////////////////// End of variable expansion
|
||||||
|
|
||||||
// Use this to track variables that are added to the environment by nm.
|
// Use this to track variables that are added to the environment by nm.
|
||||||
@ -289,13 +312,6 @@ public Integer call() {
|
|||||||
lfs.create(nmPrivateKeystorePath,
|
lfs.create(nmPrivateKeystorePath,
|
||||||
EnumSet.of(CREATE, OVERWRITE))) {
|
EnumSet.of(CREATE, OVERWRITE))) {
|
||||||
keystoreOutStream.write(keystore);
|
keystoreOutStream.write(keystore);
|
||||||
environment.put(ApplicationConstants.KEYSTORE_FILE_LOCATION_ENV_NAME,
|
|
||||||
new Path(containerWorkDir,
|
|
||||||
ContainerLaunch.KEYSTORE_FILE).toUri().getPath());
|
|
||||||
environment.put(ApplicationConstants.KEYSTORE_PASSWORD_ENV_NAME,
|
|
||||||
new String(container.getCredentials().getSecretKey(
|
|
||||||
AMSecretKeys.YARN_APPLICATION_AM_KEYSTORE_PASSWORD),
|
|
||||||
StandardCharsets.UTF_8));
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
nmPrivateKeystorePath = null;
|
nmPrivateKeystorePath = null;
|
||||||
@ -307,14 +323,6 @@ public Integer call() {
|
|||||||
lfs.create(nmPrivateTruststorePath,
|
lfs.create(nmPrivateTruststorePath,
|
||||||
EnumSet.of(CREATE, OVERWRITE))) {
|
EnumSet.of(CREATE, OVERWRITE))) {
|
||||||
truststoreOutStream.write(truststore);
|
truststoreOutStream.write(truststore);
|
||||||
environment.put(
|
|
||||||
ApplicationConstants.TRUSTSTORE_FILE_LOCATION_ENV_NAME,
|
|
||||||
new Path(containerWorkDir,
|
|
||||||
ContainerLaunch.TRUSTSTORE_FILE).toUri().getPath());
|
|
||||||
environment.put(ApplicationConstants.TRUSTSTORE_PASSWORD_ENV_NAME,
|
|
||||||
new String(container.getCredentials().getSecretKey(
|
|
||||||
AMSecretKeys.YARN_APPLICATION_AM_TRUSTSTORE_PASSWORD),
|
|
||||||
StandardCharsets.UTF_8));
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
nmPrivateTruststorePath = null;
|
nmPrivateTruststorePath = null;
|
||||||
@ -335,6 +343,16 @@ public Integer call() {
|
|||||||
containerLogDirs, localResources, nmPrivateClasspathJarDir,
|
containerLogDirs, localResources, nmPrivateClasspathJarDir,
|
||||||
nmEnvVars);
|
nmEnvVars);
|
||||||
|
|
||||||
|
expandAllEnvironmentVars(environment, containerLogDir);
|
||||||
|
|
||||||
|
// Add these if needed after expanding so we don't expand key values.
|
||||||
|
if (keystore != null) {
|
||||||
|
addKeystoreVars(environment, containerWorkDir);
|
||||||
|
}
|
||||||
|
if (truststore != null) {
|
||||||
|
addTruststoreVars(environment, containerWorkDir);
|
||||||
|
}
|
||||||
|
|
||||||
prepareContainer(localResources, containerLocalDirs);
|
prepareContainer(localResources, containerLocalDirs);
|
||||||
|
|
||||||
// Write out the environment
|
// Write out the environment
|
||||||
@ -1628,13 +1646,13 @@ public void sanitizeEnv(Map<String, String> environment, Path pwd,
|
|||||||
}
|
}
|
||||||
|
|
||||||
// variables here will be forced in, even if the container has
|
// variables here will be forced in, even if the container has
|
||||||
// specified them.
|
// specified them. Note: we do not track these in nmVars, to
|
||||||
|
// allow them to be ordered properly if they reference variables
|
||||||
|
// defined by the user.
|
||||||
String defEnvStr = conf.get(YarnConfiguration.DEFAULT_NM_ADMIN_USER_ENV);
|
String defEnvStr = conf.get(YarnConfiguration.DEFAULT_NM_ADMIN_USER_ENV);
|
||||||
Apps.setEnvFromInputProperty(environment,
|
Apps.setEnvFromInputProperty(environment,
|
||||||
YarnConfiguration.NM_ADMIN_USER_ENV, defEnvStr, conf,
|
YarnConfiguration.NM_ADMIN_USER_ENV, defEnvStr, conf,
|
||||||
File.pathSeparator);
|
File.pathSeparator);
|
||||||
nmVars.addAll(Apps.getEnvVarsFromInputProperty(
|
|
||||||
YarnConfiguration.NM_ADMIN_USER_ENV, defEnvStr, conf));
|
|
||||||
|
|
||||||
if (!Shell.WINDOWS) {
|
if (!Shell.WINDOWS) {
|
||||||
// maybe force path components
|
// maybe force path components
|
||||||
|
@ -673,7 +673,7 @@ public void testPrependDistcache() throws Exception {
|
|||||||
Container container = mock(Container.class);
|
Container container = mock(Container.class);
|
||||||
when(container.getContainerId()).thenReturn(cId);
|
when(container.getContainerId()).thenReturn(cId);
|
||||||
when(container.getLaunchContext()).thenReturn(containerLaunchContext);
|
when(container.getLaunchContext()).thenReturn(containerLaunchContext);
|
||||||
when(container.getLocalizedResources()).thenReturn(null);
|
when(container.localizationCountersAsString()).thenReturn("1,2,3,4,5");
|
||||||
Dispatcher dispatcher = mock(Dispatcher.class);
|
Dispatcher dispatcher = mock(Dispatcher.class);
|
||||||
EventHandler<Event> eventHandler = new EventHandler<Event>() {
|
EventHandler<Event> eventHandler = new EventHandler<Event>() {
|
||||||
public void handle(Event event) {
|
public void handle(Event event) {
|
||||||
@ -814,8 +814,6 @@ public void handle(Event event) {
|
|||||||
Assert.assertTrue(userSetEnv.containsKey(testKey1));
|
Assert.assertTrue(userSetEnv.containsKey(testKey1));
|
||||||
Assert.assertTrue(userSetEnv.containsKey(testKey2));
|
Assert.assertTrue(userSetEnv.containsKey(testKey2));
|
||||||
Assert.assertTrue(userSetEnv.containsKey(testKey3));
|
Assert.assertTrue(userSetEnv.containsKey(testKey3));
|
||||||
Assert.assertTrue(nmEnvTrack.contains("MALLOC_ARENA_MAX"));
|
|
||||||
Assert.assertTrue(nmEnvTrack.contains("MOUNT_LIST"));
|
|
||||||
Assert.assertEquals(userMallocArenaMaxVal + File.pathSeparator
|
Assert.assertEquals(userMallocArenaMaxVal + File.pathSeparator
|
||||||
+ mallocArenaMaxVal, userSetEnv.get("MALLOC_ARENA_MAX"));
|
+ mallocArenaMaxVal, userSetEnv.get("MALLOC_ARENA_MAX"));
|
||||||
Assert.assertEquals(testVal1, userSetEnv.get(testKey1));
|
Assert.assertEquals(testVal1, userSetEnv.get(testKey1));
|
||||||
@ -1857,6 +1855,7 @@ public void testContainerLaunchOnConfigurationError() throws Exception {
|
|||||||
when(id.toString()).thenReturn("1");
|
when(id.toString()).thenReturn("1");
|
||||||
when(container.getContainerId()).thenReturn(id);
|
when(container.getContainerId()).thenReturn(id);
|
||||||
when(container.getUser()).thenReturn("user");
|
when(container.getUser()).thenReturn("user");
|
||||||
|
when(container.localizationCountersAsString()).thenReturn("1,2,3,4,5");
|
||||||
ContainerLaunchContext clc = mock(ContainerLaunchContext.class);
|
ContainerLaunchContext clc = mock(ContainerLaunchContext.class);
|
||||||
when(clc.getCommands()).thenReturn(Lists.newArrayList());
|
when(clc.getCommands()).thenReturn(Lists.newArrayList());
|
||||||
when(container.getLaunchContext()).thenReturn(clc);
|
when(container.getLaunchContext()).thenReturn(clc);
|
||||||
@ -2453,6 +2452,7 @@ public void testDistributedCacheDirs() throws Exception {
|
|||||||
.newContainerId(ApplicationAttemptId.newInstance(appId, 1), 1);
|
.newContainerId(ApplicationAttemptId.newInstance(appId, 1), 1);
|
||||||
when(container.getContainerId()).thenReturn(containerId);
|
when(container.getContainerId()).thenReturn(containerId);
|
||||||
when(container.getUser()).thenReturn("test");
|
when(container.getUser()).thenReturn("test");
|
||||||
|
when(container.localizationCountersAsString()).thenReturn("1,2,3,4,5");
|
||||||
|
|
||||||
when(container.getLocalizedResources())
|
when(container.getLocalizedResources())
|
||||||
.thenReturn(Collections.<Path, List<String>> emptyMap());
|
.thenReturn(Collections.<Path, List<String>> emptyMap());
|
||||||
@ -2562,6 +2562,7 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
|
|||||||
when(container.getLaunchContext()).thenReturn(clc);
|
when(container.getLaunchContext()).thenReturn(clc);
|
||||||
Credentials credentials = mock(Credentials.class);
|
Credentials credentials = mock(Credentials.class);
|
||||||
when(container.getCredentials()).thenReturn(credentials);
|
when(container.getCredentials()).thenReturn(credentials);
|
||||||
|
when(container.localizationCountersAsString()).thenReturn("1,2,3,4,5");
|
||||||
doAnswer(new Answer<Void>() {
|
doAnswer(new Answer<Void>() {
|
||||||
@Override
|
@Override
|
||||||
public Void answer(InvocationOnMock invocation) throws Throwable {
|
public Void answer(InvocationOnMock invocation) throws Throwable {
|
||||||
@ -2662,4 +2663,94 @@ private String readStringFromPath(Path p) throws IOException {
|
|||||||
return new String(bytes);
|
return new String(bytes);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 20000)
|
||||||
|
public void testExpandNmAdmEnv() throws Exception {
|
||||||
|
// setup mocks
|
||||||
|
Dispatcher dispatcher = mock(Dispatcher.class);
|
||||||
|
EventHandler handler = mock(EventHandler.class);
|
||||||
|
when(dispatcher.getEventHandler()).thenReturn(handler);
|
||||||
|
ContainerExecutor containerExecutor = mock(ContainerExecutor.class);
|
||||||
|
doAnswer(new Answer<Void>() {
|
||||||
|
@Override
|
||||||
|
public Void answer(InvocationOnMock invocation) throws Throwable {
|
||||||
|
Object[] args = invocation.getArguments();
|
||||||
|
DataOutputStream dos = (DataOutputStream) args[0];
|
||||||
|
dos.writeBytes("script");
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}).when(containerExecutor).writeLaunchEnv(
|
||||||
|
any(), any(), any(), any(), any(), any(), any());
|
||||||
|
Application app = mock(Application.class);
|
||||||
|
ApplicationId appId = mock(ApplicationId.class);
|
||||||
|
when(appId.toString()).thenReturn("1");
|
||||||
|
when(app.getAppId()).thenReturn(appId);
|
||||||
|
Container container = mock(Container.class);
|
||||||
|
ContainerId id = mock(ContainerId.class);
|
||||||
|
when(id.toString()).thenReturn("1");
|
||||||
|
when(container.getContainerId()).thenReturn(id);
|
||||||
|
when(container.getUser()).thenReturn("user");
|
||||||
|
ContainerLaunchContext clc = mock(ContainerLaunchContext.class);
|
||||||
|
when(clc.getCommands()).thenReturn(Lists.newArrayList());
|
||||||
|
when(container.getLaunchContext()).thenReturn(clc);
|
||||||
|
Credentials credentials = mock(Credentials.class);
|
||||||
|
when(container.getCredentials()).thenReturn(credentials);
|
||||||
|
when(container.localizationCountersAsString()).thenReturn("1,2,3,4,5");
|
||||||
|
|
||||||
|
// Define user environment variables.
|
||||||
|
Map<String, String> userSetEnv = new HashMap<String, String>();
|
||||||
|
String userVar = "USER_VAR";
|
||||||
|
String userVarVal = "user-var-value";
|
||||||
|
userSetEnv.put(userVar, userVarVal);
|
||||||
|
when(clc.getEnvironment()).thenReturn(userSetEnv);
|
||||||
|
|
||||||
|
YarnConfiguration localConf = new YarnConfiguration(conf);
|
||||||
|
|
||||||
|
// Admin Env var that depends on USER_VAR1
|
||||||
|
String testKey1 = "TEST_KEY1";
|
||||||
|
String testVal1 = "relies on {{USER_VAR}}";
|
||||||
|
localConf.set(
|
||||||
|
YarnConfiguration.NM_ADMIN_USER_ENV + "." + testKey1, testVal1);
|
||||||
|
String testVal1Expanded; // this is what we expect after {{}} expansion
|
||||||
|
if (Shell.WINDOWS) {
|
||||||
|
testVal1Expanded = "relies on %USER_VAR%";
|
||||||
|
} else {
|
||||||
|
testVal1Expanded = "relies on $USER_VAR";
|
||||||
|
}
|
||||||
|
// Another Admin Env var that depends on the first one
|
||||||
|
String testKey2 = "TEST_KEY2";
|
||||||
|
String testVal2 = "relies on {{TEST_KEY1}}";
|
||||||
|
localConf.set(
|
||||||
|
YarnConfiguration.NM_ADMIN_USER_ENV + "." + testKey2, testVal2);
|
||||||
|
String testVal2Expanded; // this is what we expect after {{}} expansion
|
||||||
|
if (Shell.WINDOWS) {
|
||||||
|
testVal2Expanded = "relies on %TEST_KEY1%";
|
||||||
|
} else {
|
||||||
|
testVal2Expanded = "relies on $TEST_KEY1";
|
||||||
|
}
|
||||||
|
|
||||||
|
// call containerLaunch
|
||||||
|
ContainerLaunch containerLaunch = new ContainerLaunch(
|
||||||
|
distContext, localConf, dispatcher,
|
||||||
|
containerExecutor, app, container, dirsHandler, containerManager);
|
||||||
|
containerLaunch.call();
|
||||||
|
|
||||||
|
// verify the nmPrivate paths and files
|
||||||
|
ArgumentCaptor<ContainerStartContext> cscArgument =
|
||||||
|
ArgumentCaptor.forClass(ContainerStartContext.class);
|
||||||
|
verify(containerExecutor, times(1)).launchContainer(cscArgument.capture());
|
||||||
|
ContainerStartContext csc = cscArgument.getValue();
|
||||||
|
Assert.assertEquals("script",
|
||||||
|
readStringFromPath(csc.getNmPrivateContainerScriptPath()));
|
||||||
|
|
||||||
|
// verify env
|
||||||
|
ArgumentCaptor<Map> envArgument = ArgumentCaptor.forClass(Map.class);
|
||||||
|
verify(containerExecutor, times(1)).writeLaunchEnv(any(),
|
||||||
|
envArgument.capture(), any(), any(), any(), any(), any());
|
||||||
|
Map env = envArgument.getValue();
|
||||||
|
Assert.assertEquals(userVarVal, env.get(userVar));
|
||||||
|
Assert.assertEquals(testVal1Expanded, env.get(testKey1));
|
||||||
|
Assert.assertEquals(testVal2Expanded, env.get(testKey2));
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user