MAPREDUCE-5475. Ensure MRClientService verifies ACLs for users. Contributed by Jason Lowe.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1516361 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
ded91b4cfa
commit
148bf3ea4e
@ -234,6 +234,9 @@ Release 2.1.1-beta - UNRELEASED
|
||||
MAPREDUCE-5468. Fix MR AM recovery for map-only jobs. (vinodkv via
|
||||
acmurthy)
|
||||
|
||||
MAPREDUCE-5475. Ensure MRClientService verifies ACLs for users. (jlowe via
|
||||
acmurthy)
|
||||
|
||||
Release 2.1.0-beta - 2013-08-22
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -28,6 +28,7 @@
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.ipc.Server;
|
||||
import org.apache.hadoop.mapreduce.JobACL;
|
||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||
import org.apache.hadoop.mapreduce.TypeConverter;
|
||||
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
|
||||
@ -78,6 +79,8 @@
|
||||
import org.apache.hadoop.mapreduce.v2.app.security.authorize.MRAMPolicyProvider;
|
||||
import org.apache.hadoop.mapreduce.v2.app.webapp.AMWebApp;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.authorize.PolicyProvider;
|
||||
import org.apache.hadoop.service.AbstractService;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
@ -176,15 +179,21 @@ public InetSocketAddress getConnectAddress() {
|
||||
}
|
||||
|
||||
private Job verifyAndGetJob(JobId jobID,
|
||||
boolean modifyAccess) throws IOException {
|
||||
JobACL accessType) throws IOException {
|
||||
Job job = appContext.getJob(jobID);
|
||||
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
|
||||
if (!job.checkAccess(ugi, accessType)) {
|
||||
throw new AccessControlException("User " + ugi.getShortUserName()
|
||||
+ " cannot perform operation " + accessType.name() + " on "
|
||||
+ jobID);
|
||||
}
|
||||
return job;
|
||||
}
|
||||
|
||||
private Task verifyAndGetTask(TaskId taskID,
|
||||
boolean modifyAccess) throws IOException {
|
||||
JobACL accessType) throws IOException {
|
||||
Task task = verifyAndGetJob(taskID.getJobId(),
|
||||
modifyAccess).getTask(taskID);
|
||||
accessType).getTask(taskID);
|
||||
if (task == null) {
|
||||
throw new IOException("Unknown Task " + taskID);
|
||||
}
|
||||
@ -192,9 +201,9 @@ private Task verifyAndGetTask(TaskId taskID,
|
||||
}
|
||||
|
||||
private TaskAttempt verifyAndGetAttempt(TaskAttemptId attemptID,
|
||||
boolean modifyAccess) throws IOException {
|
||||
JobACL accessType) throws IOException {
|
||||
TaskAttempt attempt = verifyAndGetTask(attemptID.getTaskId(),
|
||||
modifyAccess).getAttempt(attemptID);
|
||||
accessType).getAttempt(attemptID);
|
||||
if (attempt == null) {
|
||||
throw new IOException("Unknown TaskAttempt " + attemptID);
|
||||
}
|
||||
@ -205,7 +214,7 @@ private TaskAttempt verifyAndGetAttempt(TaskAttemptId attemptID,
|
||||
public GetCountersResponse getCounters(GetCountersRequest request)
|
||||
throws IOException {
|
||||
JobId jobId = request.getJobId();
|
||||
Job job = verifyAndGetJob(jobId, false);
|
||||
Job job = verifyAndGetJob(jobId, JobACL.VIEW_JOB);
|
||||
GetCountersResponse response =
|
||||
recordFactory.newRecordInstance(GetCountersResponse.class);
|
||||
response.setCounters(TypeConverter.toYarn(job.getAllCounters()));
|
||||
@ -216,7 +225,7 @@ public GetCountersResponse getCounters(GetCountersRequest request)
|
||||
public GetJobReportResponse getJobReport(GetJobReportRequest request)
|
||||
throws IOException {
|
||||
JobId jobId = request.getJobId();
|
||||
Job job = verifyAndGetJob(jobId, false);
|
||||
Job job = verifyAndGetJob(jobId, JobACL.VIEW_JOB);
|
||||
GetJobReportResponse response =
|
||||
recordFactory.newRecordInstance(GetJobReportResponse.class);
|
||||
if (job != null) {
|
||||
@ -235,7 +244,7 @@ public GetTaskAttemptReportResponse getTaskAttemptReport(
|
||||
GetTaskAttemptReportResponse response =
|
||||
recordFactory.newRecordInstance(GetTaskAttemptReportResponse.class);
|
||||
response.setTaskAttemptReport(
|
||||
verifyAndGetAttempt(taskAttemptId, false).getReport());
|
||||
verifyAndGetAttempt(taskAttemptId, JobACL.VIEW_JOB).getReport());
|
||||
return response;
|
||||
}
|
||||
|
||||
@ -245,7 +254,8 @@ public GetTaskReportResponse getTaskReport(GetTaskReportRequest request)
|
||||
TaskId taskId = request.getTaskId();
|
||||
GetTaskReportResponse response =
|
||||
recordFactory.newRecordInstance(GetTaskReportResponse.class);
|
||||
response.setTaskReport(verifyAndGetTask(taskId, false).getReport());
|
||||
response.setTaskReport(
|
||||
verifyAndGetTask(taskId, JobACL.VIEW_JOB).getReport());
|
||||
return response;
|
||||
}
|
||||
|
||||
@ -256,7 +266,7 @@ public GetTaskAttemptCompletionEventsResponse getTaskAttemptCompletionEvents(
|
||||
JobId jobId = request.getJobId();
|
||||
int fromEventId = request.getFromEventId();
|
||||
int maxEvents = request.getMaxEvents();
|
||||
Job job = verifyAndGetJob(jobId, false);
|
||||
Job job = verifyAndGetJob(jobId, JobACL.VIEW_JOB);
|
||||
|
||||
GetTaskAttemptCompletionEventsResponse response =
|
||||
recordFactory.newRecordInstance(GetTaskAttemptCompletionEventsResponse.class);
|
||||
@ -270,9 +280,11 @@ public GetTaskAttemptCompletionEventsResponse getTaskAttemptCompletionEvents(
|
||||
public KillJobResponse killJob(KillJobRequest request)
|
||||
throws IOException {
|
||||
JobId jobId = request.getJobId();
|
||||
String message = "Kill Job received from client " + jobId;
|
||||
UserGroupInformation callerUGI = UserGroupInformation.getCurrentUser();
|
||||
String message = "Kill job " + jobId + " received from " + callerUGI
|
||||
+ " at " + Server.getRemoteAddress();
|
||||
LOG.info(message);
|
||||
verifyAndGetJob(jobId, true);
|
||||
verifyAndGetJob(jobId, JobACL.MODIFY_JOB);
|
||||
appContext.getEventHandler().handle(
|
||||
new JobDiagnosticsUpdateEvent(jobId, message));
|
||||
appContext.getEventHandler().handle(
|
||||
@ -287,9 +299,11 @@ public KillJobResponse killJob(KillJobRequest request)
|
||||
public KillTaskResponse killTask(KillTaskRequest request)
|
||||
throws IOException {
|
||||
TaskId taskId = request.getTaskId();
|
||||
String message = "Kill task received from client " + taskId;
|
||||
UserGroupInformation callerUGI = UserGroupInformation.getCurrentUser();
|
||||
String message = "Kill task " + taskId + " received from " + callerUGI
|
||||
+ " at " + Server.getRemoteAddress();
|
||||
LOG.info(message);
|
||||
verifyAndGetTask(taskId, true);
|
||||
verifyAndGetTask(taskId, JobACL.MODIFY_JOB);
|
||||
appContext.getEventHandler().handle(
|
||||
new TaskEvent(taskId, TaskEventType.T_KILL));
|
||||
KillTaskResponse response =
|
||||
@ -302,9 +316,12 @@ public KillTaskResponse killTask(KillTaskRequest request)
|
||||
public KillTaskAttemptResponse killTaskAttempt(
|
||||
KillTaskAttemptRequest request) throws IOException {
|
||||
TaskAttemptId taskAttemptId = request.getTaskAttemptId();
|
||||
String message = "Kill task attempt received from client " + taskAttemptId;
|
||||
UserGroupInformation callerUGI = UserGroupInformation.getCurrentUser();
|
||||
String message = "Kill task attempt " + taskAttemptId
|
||||
+ " received from " + callerUGI + " at "
|
||||
+ Server.getRemoteAddress();
|
||||
LOG.info(message);
|
||||
verifyAndGetAttempt(taskAttemptId, true);
|
||||
verifyAndGetAttempt(taskAttemptId, JobACL.MODIFY_JOB);
|
||||
appContext.getEventHandler().handle(
|
||||
new TaskAttemptDiagnosticsUpdateEvent(taskAttemptId, message));
|
||||
appContext.getEventHandler().handle(
|
||||
@ -322,8 +339,8 @@ public GetDiagnosticsResponse getDiagnostics(
|
||||
|
||||
GetDiagnosticsResponse response =
|
||||
recordFactory.newRecordInstance(GetDiagnosticsResponse.class);
|
||||
response.addAllDiagnostics(
|
||||
verifyAndGetAttempt(taskAttemptId, false).getDiagnostics());
|
||||
response.addAllDiagnostics(verifyAndGetAttempt(taskAttemptId,
|
||||
JobACL.VIEW_JOB).getDiagnostics());
|
||||
return response;
|
||||
}
|
||||
|
||||
@ -332,9 +349,12 @@ public GetDiagnosticsResponse getDiagnostics(
|
||||
public FailTaskAttemptResponse failTaskAttempt(
|
||||
FailTaskAttemptRequest request) throws IOException {
|
||||
TaskAttemptId taskAttemptId = request.getTaskAttemptId();
|
||||
String message = "Fail task attempt received from client " + taskAttemptId;
|
||||
UserGroupInformation callerUGI = UserGroupInformation.getCurrentUser();
|
||||
String message = "Fail task attempt " + taskAttemptId
|
||||
+ " received from " + callerUGI + " at "
|
||||
+ Server.getRemoteAddress();
|
||||
LOG.info(message);
|
||||
verifyAndGetAttempt(taskAttemptId, true);
|
||||
verifyAndGetAttempt(taskAttemptId, JobACL.MODIFY_JOB);
|
||||
appContext.getEventHandler().handle(
|
||||
new TaskAttemptDiagnosticsUpdateEvent(taskAttemptId, message));
|
||||
appContext.getEventHandler().handle(
|
||||
@ -356,7 +376,7 @@ public GetTaskReportsResponse getTaskReports(
|
||||
GetTaskReportsResponse response =
|
||||
recordFactory.newRecordInstance(GetTaskReportsResponse.class);
|
||||
|
||||
Job job = verifyAndGetJob(jobId, false);
|
||||
Job job = verifyAndGetJob(jobId, JobACL.VIEW_JOB);
|
||||
Collection<Task> tasks = job.getTasks(taskType).values();
|
||||
LOG.info("Getting task report for " + taskType + " " + jobId
|
||||
+ ". Report-size will be " + tasks.size());
|
||||
|
@ -18,13 +18,20 @@
|
||||
|
||||
package org.apache.hadoop.mapreduce.v2.app;
|
||||
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
||||
import junit.framework.Assert;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.mapreduce.JobACL;
|
||||
import org.apache.hadoop.mapreduce.MRConfig;
|
||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
|
||||
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest;
|
||||
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersRequest;
|
||||
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDiagnosticsRequest;
|
||||
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest;
|
||||
@ -32,6 +39,9 @@
|
||||
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptReportRequest;
|
||||
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportRequest;
|
||||
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportsRequest;
|
||||
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillJobRequest;
|
||||
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskAttemptRequest;
|
||||
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskRequest;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
|
||||
@ -51,6 +61,8 @@
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||
@ -169,6 +181,79 @@ public void test() throws Exception {
|
||||
app.waitForState(job, JobState.SUCCEEDED);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testViewAclOnlyCannotModify() throws Exception {
|
||||
final MRAppWithClientService app = new MRAppWithClientService(1, 0, false);
|
||||
final Configuration conf = new Configuration();
|
||||
conf.setBoolean(MRConfig.MR_ACLS_ENABLED, true);
|
||||
conf.set(MRJobConfig.JOB_ACL_VIEW_JOB, "viewonlyuser");
|
||||
Job job = app.submit(conf);
|
||||
app.waitForState(job, JobState.RUNNING);
|
||||
Assert.assertEquals("Num tasks not correct", 1, job.getTasks().size());
|
||||
Iterator<Task> it = job.getTasks().values().iterator();
|
||||
Task task = it.next();
|
||||
app.waitForState(task, TaskState.RUNNING);
|
||||
TaskAttempt attempt = task.getAttempts().values().iterator().next();
|
||||
app.waitForState(attempt, TaskAttemptState.RUNNING);
|
||||
|
||||
UserGroupInformation viewOnlyUser =
|
||||
UserGroupInformation.createUserForTesting(
|
||||
"viewonlyuser", new String[] {});
|
||||
Assert.assertTrue("viewonlyuser cannot view job",
|
||||
job.checkAccess(viewOnlyUser, JobACL.VIEW_JOB));
|
||||
Assert.assertFalse("viewonlyuser can modify job",
|
||||
job.checkAccess(viewOnlyUser, JobACL.MODIFY_JOB));
|
||||
MRClientProtocol client = viewOnlyUser.doAs(
|
||||
new PrivilegedExceptionAction<MRClientProtocol>() {
|
||||
@Override
|
||||
public MRClientProtocol run() throws Exception {
|
||||
YarnRPC rpc = YarnRPC.create(conf);
|
||||
return (MRClientProtocol) rpc.getProxy(MRClientProtocol.class,
|
||||
app.clientService.getBindAddress(), conf);
|
||||
}
|
||||
});
|
||||
|
||||
KillJobRequest killJobRequest = recordFactory.newRecordInstance(
|
||||
KillJobRequest.class);
|
||||
killJobRequest.setJobId(app.getJobId());
|
||||
try {
|
||||
client.killJob(killJobRequest);
|
||||
fail("viewonlyuser killed job");
|
||||
} catch (AccessControlException e) {
|
||||
// pass
|
||||
}
|
||||
|
||||
KillTaskRequest killTaskRequest = recordFactory.newRecordInstance(
|
||||
KillTaskRequest.class);
|
||||
killTaskRequest.setTaskId(task.getID());
|
||||
try {
|
||||
client.killTask(killTaskRequest);
|
||||
fail("viewonlyuser killed task");
|
||||
} catch (AccessControlException e) {
|
||||
// pass
|
||||
}
|
||||
|
||||
KillTaskAttemptRequest killTaskAttemptRequest =
|
||||
recordFactory.newRecordInstance(KillTaskAttemptRequest.class);
|
||||
killTaskAttemptRequest.setTaskAttemptId(attempt.getID());
|
||||
try {
|
||||
client.killTaskAttempt(killTaskAttemptRequest);
|
||||
fail("viewonlyuser killed task attempt");
|
||||
} catch (AccessControlException e) {
|
||||
// pass
|
||||
}
|
||||
|
||||
FailTaskAttemptRequest failTaskAttemptRequest =
|
||||
recordFactory.newRecordInstance(FailTaskAttemptRequest.class);
|
||||
failTaskAttemptRequest.setTaskAttemptId(attempt.getID());
|
||||
try {
|
||||
client.failTaskAttempt(failTaskAttemptRequest);
|
||||
fail("viewonlyuser killed task attempt");
|
||||
} catch (AccessControlException e) {
|
||||
// pass
|
||||
}
|
||||
}
|
||||
|
||||
private void verifyJobReport(JobReport jr) {
|
||||
Assert.assertNotNull("JobReport is null", jr);
|
||||
List<AMInfo> amInfos = jr.getAMInfos();
|
||||
|
Loading…
Reference in New Issue
Block a user