MAPREDUCE-3103. Implement Job ACLs for MRAppMaster. (mahadev)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1195761 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
8f9661da48
commit
68fa208b1c
@ -1894,6 +1894,9 @@ Release 0.23.0 - Unreleased
|
|||||||
|
|
||||||
MAPREDUCE-3220. Fixed TestCombineOutputCollector. (Devaraj K via acmurthy)
|
MAPREDUCE-3220. Fixed TestCombineOutputCollector. (Devaraj K via acmurthy)
|
||||||
|
|
||||||
|
MAPREDUCE-3103. Implement Job ACLs for MRAppMaster.
|
||||||
|
(mahadev)
|
||||||
|
|
||||||
Release 0.22.0 - Unreleased
|
Release 0.22.0 - Unreleased
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -18,11 +18,9 @@
|
|||||||
|
|
||||||
package org.apache.hadoop.mapreduce.v2.app.client;
|
package org.apache.hadoop.mapreduce.v2.app.client;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.net.InetAddress;
|
import java.net.InetAddress;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.net.UnknownHostException;
|
import java.net.UnknownHostException;
|
||||||
import java.security.AccessControlException;
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
|
||||||
@ -32,7 +30,6 @@
|
|||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||||
import org.apache.hadoop.ipc.Server;
|
import org.apache.hadoop.ipc.Server;
|
||||||
import org.apache.hadoop.mapreduce.JobACL;
|
|
||||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
|
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest;
|
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest;
|
||||||
@ -196,13 +193,6 @@ private Job verifyAndGetJob(JobId jobID,
|
|||||||
if (job == null) {
|
if (job == null) {
|
||||||
throw RPCUtil.getRemoteException("Unknown job " + jobID);
|
throw RPCUtil.getRemoteException("Unknown job " + jobID);
|
||||||
}
|
}
|
||||||
//TODO fix job acls.
|
|
||||||
//JobACL operation = JobACL.VIEW_JOB;
|
|
||||||
//if (modifyAccess) {
|
|
||||||
// operation = JobACL.MODIFY_JOB;
|
|
||||||
//}
|
|
||||||
//TO disable check access ofr now.
|
|
||||||
//checkAccess(job, operation);
|
|
||||||
return job;
|
return job;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -226,24 +216,6 @@ private TaskAttempt verifyAndGetAttempt(TaskAttemptId attemptID,
|
|||||||
return attempt;
|
return attempt;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void checkAccess(Job job, JobACL jobOperation)
|
|
||||||
throws YarnRemoteException {
|
|
||||||
if (!UserGroupInformation.isSecurityEnabled()) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
UserGroupInformation callerUGI;
|
|
||||||
try {
|
|
||||||
callerUGI = UserGroupInformation.getCurrentUser();
|
|
||||||
} catch (IOException e) {
|
|
||||||
throw RPCUtil.getRemoteException(e);
|
|
||||||
}
|
|
||||||
if(!job.checkAccess(callerUGI, jobOperation)) {
|
|
||||||
throw RPCUtil.getRemoteException(new AccessControlException("User "
|
|
||||||
+ callerUGI.getShortUserName() + " cannot perform operation "
|
|
||||||
+ jobOperation.name() + " on " + job.getID()));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public GetCountersResponse getCounters(GetCountersRequest request)
|
public GetCountersResponse getCounters(GetCountersRequest request)
|
||||||
throws YarnRemoteException {
|
throws YarnRemoteException {
|
||||||
@ -304,6 +276,7 @@ public GetTaskAttemptCompletionEventsResponse getTaskAttemptCompletionEvents(
|
|||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
@Override
|
@Override
|
||||||
public KillJobResponse killJob(KillJobRequest request)
|
public KillJobResponse killJob(KillJobRequest request)
|
||||||
throws YarnRemoteException {
|
throws YarnRemoteException {
|
||||||
@ -320,6 +293,7 @@ public KillJobResponse killJob(KillJobRequest request)
|
|||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
@Override
|
@Override
|
||||||
public KillTaskResponse killTask(KillTaskRequest request)
|
public KillTaskResponse killTask(KillTaskRequest request)
|
||||||
throws YarnRemoteException {
|
throws YarnRemoteException {
|
||||||
@ -334,6 +308,7 @@ public KillTaskResponse killTask(KillTaskRequest request)
|
|||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
@Override
|
@Override
|
||||||
public KillTaskAttemptResponse killTaskAttempt(
|
public KillTaskAttemptResponse killTaskAttempt(
|
||||||
KillTaskAttemptRequest request) throws YarnRemoteException {
|
KillTaskAttemptRequest request) throws YarnRemoteException {
|
||||||
@ -363,6 +338,7 @@ public GetDiagnosticsResponse getDiagnostics(
|
|||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
@Override
|
@Override
|
||||||
public FailTaskAttemptResponse failTaskAttempt(
|
public FailTaskAttemptResponse failTaskAttempt(
|
||||||
FailTaskAttemptRequest request) throws YarnRemoteException {
|
FailTaskAttemptRequest request) throws YarnRemoteException {
|
||||||
|
@ -28,9 +28,12 @@
|
|||||||
|
|
||||||
import org.apache.commons.lang.StringUtils;
|
import org.apache.commons.lang.StringUtils;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.mapreduce.JobACL;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
||||||
import org.apache.hadoop.mapreduce.v2.util.MRApps;
|
import org.apache.hadoop.mapreduce.v2.util.MRApps;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.util.StringHelper;
|
import org.apache.hadoop.yarn.util.StringHelper;
|
||||||
import org.apache.hadoop.yarn.util.Times;
|
import org.apache.hadoop.yarn.util.Times;
|
||||||
@ -268,6 +271,29 @@ void notFound(String s) {
|
|||||||
setTitle(join("Not found: ", s));
|
setTitle(join("Not found: ", s));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Render a ACCESS_DENIED error.
|
||||||
|
* @param s the error message to include.
|
||||||
|
*/
|
||||||
|
void accessDenied(String s) {
|
||||||
|
setStatus(HttpServletResponse.SC_FORBIDDEN);
|
||||||
|
setTitle(join("Access denied: ", s));
|
||||||
|
throw new RuntimeException("Access denied: " + s);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* check for job access.
|
||||||
|
* @param job the job that is being accessed
|
||||||
|
*/
|
||||||
|
void checkAccess(Job job) {
|
||||||
|
UserGroupInformation callerUgi = UserGroupInformation.createRemoteUser(
|
||||||
|
request().getRemoteUser());
|
||||||
|
if (!job.checkAccess(callerUgi, JobACL.VIEW_JOB)) {
|
||||||
|
accessDenied("User " + request().getRemoteUser() + " does not have " +
|
||||||
|
" permissions.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Ensure that a JOB_ID was passed into the page.
|
* Ensure that a JOB_ID was passed into the page.
|
||||||
*/
|
*/
|
||||||
@ -281,6 +307,9 @@ public void requireJob() {
|
|||||||
if (app.getJob() == null) {
|
if (app.getJob() == null) {
|
||||||
notFound($(JOB_ID));
|
notFound($(JOB_ID));
|
||||||
}
|
}
|
||||||
|
/* check for acl access */
|
||||||
|
Job job = app.context.getJob(jobID);
|
||||||
|
checkAccess(job);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
badRequest(e.getMessage() == null ?
|
badRequest(e.getMessage() == null ?
|
||||||
e.getClass().getName() : e.getMessage());
|
e.getClass().getName() : e.getMessage());
|
||||||
@ -296,7 +325,8 @@ public void requireTask() {
|
|||||||
throw new RuntimeException("missing task ID");
|
throw new RuntimeException("missing task ID");
|
||||||
}
|
}
|
||||||
TaskId taskID = MRApps.toTaskID($(TASK_ID));
|
TaskId taskID = MRApps.toTaskID($(TASK_ID));
|
||||||
app.setJob(app.context.getJob(taskID.getJobId()));
|
Job job = app.context.getJob(taskID.getJobId());
|
||||||
|
app.setJob(job);
|
||||||
if (app.getJob() == null) {
|
if (app.getJob() == null) {
|
||||||
notFound(MRApps.toString(taskID.getJobId()));
|
notFound(MRApps.toString(taskID.getJobId()));
|
||||||
} else {
|
} else {
|
||||||
@ -305,6 +335,7 @@ public void requireTask() {
|
|||||||
notFound($(TASK_ID));
|
notFound($(TASK_ID));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
checkAccess(job);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
badRequest(e.getMessage());
|
badRequest(e.getMessage());
|
||||||
}
|
}
|
||||||
|
@ -74,19 +74,20 @@ public class CompletedJob implements org.apache.hadoop.mapreduce.v2.app.job.Job
|
|||||||
private final Map<TaskId, Task> reduceTasks = new HashMap<TaskId, Task>();
|
private final Map<TaskId, Task> reduceTasks = new HashMap<TaskId, Task>();
|
||||||
private final String user;
|
private final String user;
|
||||||
private final Path confFile;
|
private final Path confFile;
|
||||||
|
private JobACLsManager aclsMgr;
|
||||||
private List<TaskAttemptCompletionEvent> completionEvents = null;
|
private List<TaskAttemptCompletionEvent> completionEvents = null;
|
||||||
private JobInfo jobInfo;
|
private JobInfo jobInfo;
|
||||||
|
|
||||||
public CompletedJob(Configuration conf, JobId jobId, Path historyFile,
|
public CompletedJob(Configuration conf, JobId jobId, Path historyFile,
|
||||||
boolean loadTasks, String userName, Path confFile) throws IOException {
|
boolean loadTasks, String userName, Path confFile, JobACLsManager aclsMgr)
|
||||||
|
throws IOException {
|
||||||
LOG.info("Loading job: " + jobId + " from file: " + historyFile);
|
LOG.info("Loading job: " + jobId + " from file: " + historyFile);
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
this.jobId = jobId;
|
this.jobId = jobId;
|
||||||
this.confFile = confFile;
|
this.confFile = confFile;
|
||||||
|
this.aclsMgr = aclsMgr;
|
||||||
|
|
||||||
loadFullHistoryData(loadTasks, historyFile);
|
loadFullHistoryData(loadTasks, historyFile);
|
||||||
|
|
||||||
user = userName;
|
user = userName;
|
||||||
counters = TypeConverter.toYarn(jobInfo.getTotalCounters());
|
counters = TypeConverter.toYarn(jobInfo.getTotalCounters());
|
||||||
diagnostics.add(jobInfo.getErrorInfo());
|
diagnostics.add(jobInfo.getErrorInfo());
|
||||||
@ -314,7 +315,6 @@ boolean checkAccess(UserGroupInformation callerUGI, JobACL jobOperation) {
|
|||||||
}
|
}
|
||||||
Map<JobACL, AccessControlList> jobACLs = jobInfo.getJobACLs();
|
Map<JobACL, AccessControlList> jobACLs = jobInfo.getJobACLs();
|
||||||
AccessControlList jobACL = jobACLs.get(jobOperation);
|
AccessControlList jobACL = jobACLs.get(jobOperation);
|
||||||
JobACLsManager aclsMgr = new JobACLsManager(conf);
|
|
||||||
return aclsMgr.checkAccess(callerUGI, jobOperation,
|
return aclsMgr.checkAccess(callerUGI, jobOperation,
|
||||||
jobInfo.getUsername(), jobACL);
|
jobInfo.getUsername(), jobACL);
|
||||||
}
|
}
|
||||||
|
@ -48,6 +48,7 @@
|
|||||||
import org.apache.hadoop.fs.PathFilter;
|
import org.apache.hadoop.fs.PathFilter;
|
||||||
import org.apache.hadoop.fs.RemoteIterator;
|
import org.apache.hadoop.fs.RemoteIterator;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
|
import org.apache.hadoop.mapred.JobACLsManager;
|
||||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
import org.apache.hadoop.mapreduce.TypeConverter;
|
import org.apache.hadoop.mapreduce.TypeConverter;
|
||||||
import org.apache.hadoop.mapreduce.jobhistory.JobSummary;
|
import org.apache.hadoop.mapreduce.jobhistory.JobSummary;
|
||||||
@ -125,6 +126,8 @@ public class JobHistory extends AbstractService implements HistoryContext {
|
|||||||
//The number of jobs to maintain in the job list cache.
|
//The number of jobs to maintain in the job list cache.
|
||||||
private int jobListCacheSize;
|
private int jobListCacheSize;
|
||||||
|
|
||||||
|
private JobACLsManager aclsMgr;
|
||||||
|
|
||||||
//The number of loaded jobs.
|
//The number of loaded jobs.
|
||||||
private int loadedJobCacheSize;
|
private int loadedJobCacheSize;
|
||||||
|
|
||||||
@ -203,7 +206,7 @@ public void init(Configuration conf) throws YarnException {
|
|||||||
+ intermediateDoneDirPath + "]", e);
|
+ intermediateDoneDirPath + "]", e);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
this.aclsMgr = new JobACLsManager(conf);
|
||||||
|
|
||||||
jobListCacheSize = conf.getInt(JHAdminConfig.MR_HISTORY_JOBLIST_CACHE_SIZE,
|
jobListCacheSize = conf.getInt(JHAdminConfig.MR_HISTORY_JOBLIST_CACHE_SIZE,
|
||||||
DEFAULT_JOBLIST_CACHE_SIZE);
|
DEFAULT_JOBLIST_CACHE_SIZE);
|
||||||
@ -648,7 +651,7 @@ private Job loadJob(MetaInfo metaInfo) {
|
|||||||
try {
|
try {
|
||||||
Job job = new CompletedJob(conf, metaInfo.getJobIndexInfo().getJobId(),
|
Job job = new CompletedJob(conf, metaInfo.getJobIndexInfo().getJobId(),
|
||||||
metaInfo.getHistoryFile(), true, metaInfo.getJobIndexInfo().getUser(),
|
metaInfo.getHistoryFile(), true, metaInfo.getJobIndexInfo().getUser(),
|
||||||
metaInfo.getConfFile());
|
metaInfo.getConfFile(), this.aclsMgr);
|
||||||
addToLoadedJobCache(job);
|
addToLoadedJobCache(job);
|
||||||
return job;
|
return job;
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
|
Loading…
Reference in New Issue
Block a user