YARN-8960. [Submarine] Can't get submarine service status using the command of "yarn app -status" under security environment. (Zac Zhou via wangda)
Change-Id: I21b1addc9c32817650ea744a8f2e6b5602f2f4d4
This commit is contained in:
parent
6357803645
commit
8b23814415
@ -16,7 +16,6 @@
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.submarine.common.ClientContext;
|
||||
import org.apache.hadoop.yarn.submarine.common.fs.DefaultRemoteDirectoryManager;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.submarine.runtimes.RuntimeFactory;
|
||||
import org.slf4j.Logger;
|
||||
@ -43,8 +42,6 @@ private static ClientContext getClientContext() {
|
||||
Configuration conf = new YarnConfiguration();
|
||||
ClientContext clientContext = new ClientContext();
|
||||
clientContext.setConfiguration(conf);
|
||||
clientContext.setRemoteDirectoryManager(
|
||||
new DefaultRemoteDirectoryManager(clientContext));
|
||||
RuntimeFactory runtimeFactory = RuntimeFactory.getRuntimeFactory(
|
||||
clientContext);
|
||||
clientContext.setRuntimeFactory(runtimeFactory);
|
||||
|
@ -52,4 +52,7 @@ public class CliConstants {
|
||||
public static final String QUICKLINK = "quicklink";
|
||||
public static final String TENSORBOARD_DOCKER_IMAGE =
|
||||
"tensorboard_docker_image";
|
||||
public static final String KEYTAB = "keytab";
|
||||
public static final String PRINCIPAL = "principal";
|
||||
public static final String DISTRIBUTE_KEYTAB = "distribute_keytab";
|
||||
}
|
||||
|
@ -14,22 +14,33 @@
|
||||
|
||||
package org.apache.hadoop.yarn.submarine.client.cli;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceTypeInfo;
|
||||
import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.submarine.client.cli.param.RunJobParameters;
|
||||
import org.apache.hadoop.yarn.submarine.common.exception.SubmarineRuntimeException;
|
||||
import org.apache.hadoop.yarn.submarine.common.fs.RemoteDirectoryManager;
|
||||
import org.apache.hadoop.yarn.util.UnitsConversionUtil;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.apache.hadoop.yarn.submarine.client.cli.CliConstants.KEYTAB;
|
||||
import static org.apache.hadoop.yarn.submarine.client.cli.CliConstants.PRINCIPAL;
|
||||
|
||||
public class CliUtils {
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(CliUtils.class);
|
||||
private final static String RES_PATTERN = "^[^=]+=\\d+\\s?\\w*$";
|
||||
/**
|
||||
* Replace patterns inside cli
|
||||
@ -161,4 +172,49 @@ public static boolean argsForHelp(String[] args) {
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
public static void doLoginIfSecure(String keytab, String principal) throws
|
||||
IOException {
|
||||
if (!UserGroupInformation.isSecurityEnabled()) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (StringUtils.isEmpty(keytab) || StringUtils.isEmpty(principal)) {
|
||||
if (StringUtils.isNotEmpty(keytab)) {
|
||||
SubmarineRuntimeException e = new SubmarineRuntimeException("The " +
|
||||
"parameter of " + PRINCIPAL + " is missing.");
|
||||
LOG.error(e.getMessage(), e);
|
||||
throw e;
|
||||
}
|
||||
|
||||
if (StringUtils.isNotEmpty(principal)) {
|
||||
SubmarineRuntimeException e = new SubmarineRuntimeException("The " +
|
||||
"parameter of " + KEYTAB + " is missing.");
|
||||
LOG.error(e.getMessage(), e);
|
||||
throw e;
|
||||
}
|
||||
|
||||
UserGroupInformation user = UserGroupInformation.getCurrentUser();
|
||||
if(user == null || user.getAuthenticationMethod() ==
|
||||
UserGroupInformation.AuthenticationMethod.SIMPLE) {
|
||||
SubmarineRuntimeException e = new SubmarineRuntimeException("Failed " +
|
||||
"to authenticate in secure environment. Please run kinit " +
|
||||
"command in advance or use " + "--" + KEYTAB + "/--" + PRINCIPAL +
|
||||
" parameters");
|
||||
LOG.error(e.getMessage(), e);
|
||||
throw e;
|
||||
}
|
||||
LOG.info("Submarine job is submitted by user: " + user.getUserName());
|
||||
return;
|
||||
}
|
||||
|
||||
File keytabFile = new File(keytab);
|
||||
if (!keytabFile.exists()) {
|
||||
SubmarineRuntimeException e = new SubmarineRuntimeException("No " +
|
||||
"keytab localized at " + keytab);
|
||||
LOG.error(e.getMessage(), e);
|
||||
throw e;
|
||||
}
|
||||
UserGroupInformation.loginUserFromKeytab(principal, keytab);
|
||||
}
|
||||
}
|
||||
|
@ -125,6 +125,15 @@ private Options generateOptions() {
|
||||
+ "if want to link to first worker's 7070 port, and text of quicklink "
|
||||
+ "is Notebook_UI, user need to specify --quicklink "
|
||||
+ "Notebook_UI=https://master-0:7070");
|
||||
options.addOption(CliConstants.KEYTAB, true, "Specify keytab used by the " +
|
||||
"job under security environment");
|
||||
options.addOption(CliConstants.PRINCIPAL, true, "Specify principal used " +
|
||||
"by the job under security environment");
|
||||
options.addOption(CliConstants.DISTRIBUTE_KEYTAB, false, "Distribute " +
|
||||
"local keytab to cluster machines for service authentication. If not " +
|
||||
"sepcified, pre-destributed keytab of which path specified by" +
|
||||
" parameter" + CliConstants.KEYTAB + " on cluster machines will be " +
|
||||
"used");
|
||||
options.addOption("h", "help", false, "Print help");
|
||||
return options;
|
||||
}
|
||||
@ -153,7 +162,8 @@ private void parseCommandLineAndGetRunJobParameters(String[] args)
|
||||
// Do parsing
|
||||
GnuParser parser = new GnuParser();
|
||||
CommandLine cli = parser.parse(options, args);
|
||||
parameters.updateParametersByParsedCommandline(cli, options, clientContext);
|
||||
parameters.updateParametersByParsedCommandline(cli, options,
|
||||
clientContext);
|
||||
} catch (ParseException e) {
|
||||
LOG.error("Exception in parse:", e.getMessage());
|
||||
printUsages();
|
||||
|
@ -51,6 +51,10 @@ public class RunJobParameters extends RunParameters {
|
||||
private boolean waitJobFinish = false;
|
||||
private boolean distributed = false;
|
||||
|
||||
private String keytab;
|
||||
private String principal;
|
||||
private boolean distributeKeytab = false;
|
||||
|
||||
@Override
|
||||
public void updateParametersByParsedCommandline(CommandLine parsedCommandLine,
|
||||
Options options, ClientContext clientContext)
|
||||
@ -85,6 +89,12 @@ public void updateParametersByParsedCommandline(CommandLine parsedCommandLine,
|
||||
+ "please double check.");
|
||||
}
|
||||
|
||||
String kerberosKeytab = parsedCommandLine.getOptionValue(
|
||||
CliConstants.KEYTAB);
|
||||
String kerberosPrincipal = parsedCommandLine.getOptionValue(
|
||||
CliConstants.PRINCIPAL);
|
||||
CliUtils.doLoginIfSecure(kerberosKeytab, kerberosPrincipal);
|
||||
|
||||
workerResource = null;
|
||||
if (nWorkers > 0) {
|
||||
String workerResourceStr = parsedCommandLine.getOptionValue(
|
||||
@ -149,10 +159,16 @@ public void updateParametersByParsedCommandline(CommandLine parsedCommandLine,
|
||||
String psLaunchCommand = parsedCommandLine.getOptionValue(
|
||||
CliConstants.PS_LAUNCH_CMD);
|
||||
|
||||
boolean distributeKerberosKeytab = parsedCommandLine.hasOption(CliConstants
|
||||
.DISTRIBUTE_KEYTAB);
|
||||
|
||||
this.setInputPath(input).setCheckpointPath(jobDir).setNumPS(nPS).setNumWorkers(nWorkers)
|
||||
.setPSLaunchCmd(psLaunchCommand).setWorkerLaunchCmd(workerLaunchCmd)
|
||||
.setPsResource(psResource)
|
||||
.setTensorboardEnabled(tensorboard);
|
||||
.setTensorboardEnabled(tensorboard)
|
||||
.setKeytab(kerberosKeytab)
|
||||
.setPrincipal(kerberosPrincipal)
|
||||
.setDistributeKeytab(distributeKerberosKeytab);
|
||||
|
||||
super.updateParametersByParsedCommandline(parsedCommandLine,
|
||||
options, clientContext);
|
||||
@ -271,4 +287,32 @@ public String getTensorboardDockerImage() {
|
||||
public List<Quicklink> getQuicklinks() {
|
||||
return quicklinks;
|
||||
}
|
||||
|
||||
public String getKeytab() {
|
||||
return keytab;
|
||||
}
|
||||
|
||||
public RunJobParameters setKeytab(String kerberosKeytab) {
|
||||
this.keytab = kerberosKeytab;
|
||||
return this;
|
||||
}
|
||||
|
||||
public String getPrincipal() {
|
||||
return principal;
|
||||
}
|
||||
|
||||
public RunJobParameters setPrincipal(String kerberosPrincipal) {
|
||||
this.principal = kerberosPrincipal;
|
||||
return this;
|
||||
}
|
||||
|
||||
public boolean isDistributeKeytab() {
|
||||
return distributeKeytab;
|
||||
}
|
||||
|
||||
public RunJobParameters setDistributeKeytab(
|
||||
boolean distributeKerberosKeytab) {
|
||||
this.distributeKeytab = distributeKerberosKeytab;
|
||||
return this;
|
||||
}
|
||||
}
|
||||
|
@ -18,13 +18,14 @@
|
||||
import org.apache.hadoop.yarn.client.api.YarnClient;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.submarine.common.conf.SubmarineConfiguration;
|
||||
import org.apache.hadoop.yarn.submarine.common.fs.DefaultRemoteDirectoryManager;
|
||||
import org.apache.hadoop.yarn.submarine.common.fs.RemoteDirectoryManager;
|
||||
import org.apache.hadoop.yarn.submarine.runtimes.RuntimeFactory;
|
||||
|
||||
public class ClientContext {
|
||||
private Configuration yarnConf = new YarnConfiguration();
|
||||
|
||||
private RemoteDirectoryManager remoteDirectoryManager;
|
||||
private volatile RemoteDirectoryManager remoteDirectoryManager;
|
||||
private YarnClient yarnClient;
|
||||
private Configuration submarineConfig;
|
||||
private RuntimeFactory runtimeFactory;
|
||||
@ -51,14 +52,16 @@ public void setConfiguration(Configuration conf) {
|
||||
}
|
||||
|
||||
public RemoteDirectoryManager getRemoteDirectoryManager() {
|
||||
if(remoteDirectoryManager == null) {
|
||||
synchronized (this) {
|
||||
if(remoteDirectoryManager == null) {
|
||||
remoteDirectoryManager = new DefaultRemoteDirectoryManager(this);
|
||||
}
|
||||
}
|
||||
}
|
||||
return remoteDirectoryManager;
|
||||
}
|
||||
|
||||
public void setRemoteDirectoryManager(
|
||||
RemoteDirectoryManager remoteDirectoryManager) {
|
||||
this.remoteDirectoryManager = remoteDirectoryManager;
|
||||
}
|
||||
|
||||
public Configuration getSubmarineConfig() {
|
||||
return submarineConfig;
|
||||
}
|
||||
|
@ -84,7 +84,8 @@ public Path getUserRootFolder() throws IOException {
|
||||
}
|
||||
|
||||
private Path getJobRootFolder(String jobName) throws IOException {
|
||||
Path jobRootPath = getUserRootFolder();
|
||||
Path userRoot = getUserRootFolder();
|
||||
Path jobRootPath = new Path(userRoot, jobName);
|
||||
createFolderIfNotExist(jobRootPath);
|
||||
// Get a file status to make sure it is a absolute path.
|
||||
FileStatus fStatus = fs.getFileStatus(jobRootPath);
|
||||
|
@ -15,9 +15,11 @@
|
||||
package org.apache.hadoop.yarn.submarine.runtimes.yarnservice;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.client.api.AppAdminClient;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
@ -28,6 +30,7 @@
|
||||
import org.apache.hadoop.yarn.service.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.service.api.records.ResourceInformation;
|
||||
import org.apache.hadoop.yarn.service.api.records.Service;
|
||||
import org.apache.hadoop.yarn.service.api.records.KerberosPrincipal;
|
||||
import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
|
||||
import org.apache.hadoop.yarn.submarine.client.cli.param.Quicklink;
|
||||
import org.apache.hadoop.yarn.submarine.client.cli.param.RunJobParameters;
|
||||
@ -297,8 +300,26 @@ private File findFileOnClassPath(final String fileName) {
|
||||
private void uploadToRemoteFileAndLocalizeToContainerWorkDir(Path stagingDir,
|
||||
String fileToUpload, String destFilename, Component comp)
|
||||
throws IOException {
|
||||
Path uploadedFilePath = uploadToRemoteFile(stagingDir, fileToUpload);
|
||||
locateRemoteFileToContainerWorkDir(destFilename, comp, uploadedFilePath);
|
||||
}
|
||||
|
||||
private void locateRemoteFileToContainerWorkDir(String destFilename,
|
||||
Component comp, Path uploadedFilePath) throws IOException {
|
||||
FileSystem fs = FileSystem.get(clientContext.getYarnConfig());
|
||||
|
||||
FileStatus fileStatus = fs.getFileStatus(uploadedFilePath);
|
||||
LOG.info("Uploaded file path = " + fileStatus.getPath());
|
||||
|
||||
// Set it to component's files list
|
||||
comp.getConfiguration().getFiles().add(new ConfigFile().srcFile(
|
||||
fileStatus.getPath().toUri().toString()).destFile(destFilename)
|
||||
.type(ConfigFile.TypeEnum.STATIC));
|
||||
}
|
||||
|
||||
private Path uploadToRemoteFile(Path stagingDir, String fileToUpload) throws
|
||||
IOException {
|
||||
FileSystem fs = FileSystem.get(clientContext.getYarnConfig());
|
||||
// Upload to remote FS under staging area
|
||||
File localFile = new File(fileToUpload);
|
||||
if (!localFile.exists()) {
|
||||
@ -317,14 +338,13 @@ private void uploadToRemoteFileAndLocalizeToContainerWorkDir(Path stagingDir,
|
||||
fs.copyFromLocalFile(new Path(fileToUpload), uploadedFilePath);
|
||||
uploadedFiles.add(uploadedFilePath);
|
||||
}
|
||||
return uploadedFilePath;
|
||||
}
|
||||
|
||||
FileStatus fileStatus = fs.getFileStatus(uploadedFilePath);
|
||||
LOG.info("Uploaded file path = " + fileStatus.getPath());
|
||||
|
||||
// Set it to component's files list
|
||||
comp.getConfiguration().getFiles().add(new ConfigFile().srcFile(
|
||||
fileStatus.getPath().toUri().toString()).destFile(destFilename)
|
||||
.type(ConfigFile.TypeEnum.STATIC));
|
||||
private void setPermission(Path destPath, FsPermission permission) throws
|
||||
IOException {
|
||||
FileSystem fs = FileSystem.get(clientContext.getYarnConfig());
|
||||
fs.setPermission(destPath, new FsPermission(permission));
|
||||
}
|
||||
|
||||
private void handleLaunchCommand(RunJobParameters parameters,
|
||||
@ -472,6 +492,7 @@ private Service createServiceByParameters(RunJobParameters parameters)
|
||||
serviceSpec.setName(parameters.getName());
|
||||
serviceSpec.setVersion(String.valueOf(System.currentTimeMillis()));
|
||||
serviceSpec.setArtifact(getDockerArtifact(parameters.getDockerImageName()));
|
||||
handleKerberosPrincipal(parameters);
|
||||
|
||||
handleServiceEnvs(serviceSpec, parameters);
|
||||
|
||||
@ -544,6 +565,32 @@ private String generateServiceSpecFile(Service service) throws IOException {
|
||||
return serviceSpecFile.getAbsolutePath();
|
||||
}
|
||||
|
||||
private void handleKerberosPrincipal(RunJobParameters parameters) throws
|
||||
IOException {
|
||||
if(StringUtils.isNotBlank(parameters.getKeytab()) && StringUtils
|
||||
.isNotBlank(parameters.getPrincipal())) {
|
||||
String keytab = parameters.getKeytab();
|
||||
String principal = parameters.getPrincipal();
|
||||
if(parameters.isDistributeKeytab()) {
|
||||
Path stagingDir =
|
||||
clientContext.getRemoteDirectoryManager().getJobStagingArea(
|
||||
parameters.getName(), true);
|
||||
Path remoteKeytabPath = uploadToRemoteFile(stagingDir, keytab);
|
||||
//only the owner has read access
|
||||
setPermission(remoteKeytabPath,
|
||||
FsPermission.createImmutable((short)Integer.parseInt("400", 8)));
|
||||
serviceSpec.setKerberosPrincipal(new KerberosPrincipal().keytab(
|
||||
remoteKeytabPath.toString()).principalName(principal));
|
||||
} else {
|
||||
if(!keytab.startsWith("file")) {
|
||||
keytab = "file://" + keytab;
|
||||
}
|
||||
serviceSpec.setKerberosPrincipal(new KerberosPrincipal().keytab(
|
||||
keytab).principalName(principal));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
|
@ -92,7 +92,9 @@ public void testBasicRunJobForDistributedTraining() throws Exception {
|
||||
"--num_workers", "3", "--num_ps", "2", "--worker_launch_cmd",
|
||||
"python run-job.py", "--worker_resources", "memory=2048M,vcores=2",
|
||||
"--ps_resources", "memory=4G,vcores=4", "--tensorboard", "true",
|
||||
"--ps_launch_cmd", "python run-ps.py", "--verbose" });
|
||||
"--ps_launch_cmd", "python run-ps.py", "--keytab", "/keytab/path",
|
||||
"--principal", "user/_HOST@domain.com", "--distribute_keytab",
|
||||
"--verbose" });
|
||||
|
||||
RunJobParameters jobRunParameters = runJobCli.getRunJobParameters();
|
||||
|
||||
@ -108,6 +110,11 @@ public void testBasicRunJobForDistributedTraining() throws Exception {
|
||||
jobRunParameters.getWorkerResource());
|
||||
Assert.assertEquals(jobRunParameters.getDockerImageName(),
|
||||
"tf-docker:1.1.0");
|
||||
Assert.assertEquals(jobRunParameters.getKeytab(),
|
||||
"/keytab/path");
|
||||
Assert.assertEquals(jobRunParameters.getPrincipal(),
|
||||
"user/_HOST@domain.com");
|
||||
Assert.assertTrue(jobRunParameters.isDistributeKeytab());
|
||||
Assert.assertTrue(SubmarineLogs.isVerbose());
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user