YARN-5068. Expose scheduler queue to application master. (Harish Jaiprakash via rohithsharmaks)
This commit is contained in:
parent
d464f4d1c4
commit
b7ac85259c
@ -159,6 +159,13 @@ public enum Environment {
|
|||||||
*/
|
*/
|
||||||
LD_LIBRARY_PATH("LD_LIBRARY_PATH"),
|
LD_LIBRARY_PATH("LD_LIBRARY_PATH"),
|
||||||
|
|
||||||
|
/**
|
||||||
|
* $YARN_RESOURCEMANAGER_APPLICATION_QUEUE
|
||||||
|
* The queue into which the app was submitted/launched.
|
||||||
|
*/
|
||||||
|
YARN_RESOURCEMANAGER_APPLICATION_QUEUE(
|
||||||
|
"YARN_RESOURCEMANAGER_APPLICATION_QUEUE"),
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* $HADOOP_CONF_DIR
|
* $HADOOP_CONF_DIR
|
||||||
* Final, non-modifiable.
|
* Final, non-modifiable.
|
||||||
|
@ -51,6 +51,7 @@
|
|||||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
import org.apache.hadoop.yarn.client.NMProxy;
|
import org.apache.hadoop.yarn.client.NMProxy;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||||
@ -191,12 +192,25 @@ private ContainerLaunchContext createAMContainerLaunchContext(
|
|||||||
+ StringUtils.arrayToString(container.getCommands().toArray(
|
+ StringUtils.arrayToString(container.getCommands().toArray(
|
||||||
new String[0])));
|
new String[0])));
|
||||||
|
|
||||||
|
// Populate the current queue name in the environment variable.
|
||||||
|
setupQueueNameEnv(container, applicationMasterContext);
|
||||||
|
|
||||||
// Finalize the container
|
// Finalize the container
|
||||||
setupTokens(container, containerID);
|
setupTokens(container, containerID);
|
||||||
|
|
||||||
return container;
|
return container;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void setupQueueNameEnv(ContainerLaunchContext container,
|
||||||
|
ApplicationSubmissionContext applicationMasterContext) {
|
||||||
|
String queueName = applicationMasterContext.getQueue();
|
||||||
|
if (queueName == null) {
|
||||||
|
queueName = YarnConfiguration.DEFAULT_QUEUE_NAME;
|
||||||
|
}
|
||||||
|
container.getEnvironment().put(ApplicationConstants.Environment
|
||||||
|
.YARN_RESOURCEMANAGER_APPLICATION_QUEUE.key(), queueName);
|
||||||
|
}
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
protected void setupTokens(
|
protected void setupTokens(
|
||||||
|
@ -92,6 +92,7 @@ private static final class MyContainerManagerImpl implements
|
|||||||
String nmHostAtContainerManager = null;
|
String nmHostAtContainerManager = null;
|
||||||
long submitTimeAtContainerManager;
|
long submitTimeAtContainerManager;
|
||||||
int maxAppAttempts;
|
int maxAppAttempts;
|
||||||
|
private String queueName;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public StartContainersResponse
|
public StartContainersResponse
|
||||||
@ -120,6 +121,8 @@ private static final class MyContainerManagerImpl implements
|
|||||||
submitTimeAtContainerManager =
|
submitTimeAtContainerManager =
|
||||||
Long.parseLong(env.get(ApplicationConstants.APP_SUBMIT_TIME_ENV));
|
Long.parseLong(env.get(ApplicationConstants.APP_SUBMIT_TIME_ENV));
|
||||||
maxAppAttempts = YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS;
|
maxAppAttempts = YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS;
|
||||||
|
queueName = env.get(ApplicationConstants.Environment
|
||||||
|
.YARN_RESOURCEMANAGER_APPLICATION_QUEUE.key());
|
||||||
return StartContainersResponse.newInstance(
|
return StartContainersResponse.newInstance(
|
||||||
new HashMap<String, ByteBuffer>(), new ArrayList<ContainerId>(),
|
new HashMap<String, ByteBuffer>(), new ArrayList<ContainerId>(),
|
||||||
new HashMap<ContainerId, SerializedException>());
|
new HashMap<ContainerId, SerializedException>());
|
||||||
@ -182,6 +185,8 @@ public void testAMLaunchAndCleanup() throws Exception {
|
|||||||
containerManager.nmHostAtContainerManager);
|
containerManager.nmHostAtContainerManager);
|
||||||
Assert.assertEquals(YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS,
|
Assert.assertEquals(YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS,
|
||||||
containerManager.maxAppAttempts);
|
containerManager.maxAppAttempts);
|
||||||
|
Assert.assertEquals(YarnConfiguration.DEFAULT_QUEUE_NAME,
|
||||||
|
containerManager.queueName);
|
||||||
|
|
||||||
MockAM am = new MockAM(rm.getRMContext(), rm
|
MockAM am = new MockAM(rm.getRMContext(), rm
|
||||||
.getApplicationMasterService(), appAttemptId);
|
.getApplicationMasterService(), appAttemptId);
|
||||||
|
Loading…
Reference in New Issue
Block a user