diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index d0345594ac..6d3e6d0303 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -234,6 +234,9 @@ Release 2.1.1-beta - UNRELEASED MAPREDUCE-5468. Fix MR AM recovery for map-only jobs. (vinodkv via acmurthy) + MAPREDUCE-5475. Ensure MRClientService verifies ACLs for users. (jlowe via + acmurthy) + Release 2.1.0-beta - 2013-08-22 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java index 4bb39696e1..d36bf62fdf 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java @@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.mapreduce.JobACL; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol; @@ -78,6 +79,8 @@ import org.apache.hadoop.mapreduce.v2.app.security.authorize.MRAMPolicyProvider; import org.apache.hadoop.mapreduce.v2.app.webapp.AMWebApp; import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.PolicyProvider; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.factories.RecordFactory; @@ -175,16 +178,22 @@ public InetSocketAddress getConnectAddress() { return getBindAddress(); } - private Job verifyAndGetJob(JobId jobID, - boolean modifyAccess) throws IOException { + private Job verifyAndGetJob(JobId jobID, + JobACL accessType) throws IOException { Job job = appContext.getJob(jobID); + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + if (!job.checkAccess(ugi, accessType)) { + throw new AccessControlException("User " + ugi.getShortUserName() + + " cannot perform operation " + accessType.name() + " on " + + jobID); + } return job; } private Task verifyAndGetTask(TaskId taskID, - boolean modifyAccess) throws IOException { + JobACL accessType) throws IOException { Task task = verifyAndGetJob(taskID.getJobId(), - modifyAccess).getTask(taskID); + accessType).getTask(taskID); if (task == null) { throw new IOException("Unknown Task " + taskID); } @@ -192,9 +201,9 @@ private Task verifyAndGetTask(TaskId taskID, } private TaskAttempt verifyAndGetAttempt(TaskAttemptId attemptID, - boolean modifyAccess) throws IOException { + JobACL accessType) throws IOException { TaskAttempt attempt = verifyAndGetTask(attemptID.getTaskId(), - modifyAccess).getAttempt(attemptID); + accessType).getAttempt(attemptID); if (attempt == null) { throw new IOException("Unknown TaskAttempt " + attemptID); } @@ -205,7 +214,7 @@ private TaskAttempt verifyAndGetAttempt(TaskAttemptId attemptID, public GetCountersResponse getCounters(GetCountersRequest request) throws IOException { JobId jobId = request.getJobId(); - Job job = verifyAndGetJob(jobId, false); + Job job = verifyAndGetJob(jobId, JobACL.VIEW_JOB); GetCountersResponse response = recordFactory.newRecordInstance(GetCountersResponse.class); response.setCounters(TypeConverter.toYarn(job.getAllCounters())); @@ -216,7 +225,7 @@ public GetCountersResponse getCounters(GetCountersRequest request) public GetJobReportResponse getJobReport(GetJobReportRequest request) throws IOException { JobId jobId = request.getJobId(); - Job job = verifyAndGetJob(jobId, false); + Job job = verifyAndGetJob(jobId, JobACL.VIEW_JOB); GetJobReportResponse response = recordFactory.newRecordInstance(GetJobReportResponse.class); if (job != null) { @@ -235,7 +244,7 @@ public GetTaskAttemptReportResponse getTaskAttemptReport( GetTaskAttemptReportResponse response = recordFactory.newRecordInstance(GetTaskAttemptReportResponse.class); response.setTaskAttemptReport( - verifyAndGetAttempt(taskAttemptId, false).getReport()); + verifyAndGetAttempt(taskAttemptId, JobACL.VIEW_JOB).getReport()); return response; } @@ -245,7 +254,8 @@ public GetTaskReportResponse getTaskReport(GetTaskReportRequest request) TaskId taskId = request.getTaskId(); GetTaskReportResponse response = recordFactory.newRecordInstance(GetTaskReportResponse.class); - response.setTaskReport(verifyAndGetTask(taskId, false).getReport()); + response.setTaskReport( + verifyAndGetTask(taskId, JobACL.VIEW_JOB).getReport()); return response; } @@ -256,7 +266,7 @@ public GetTaskAttemptCompletionEventsResponse getTaskAttemptCompletionEvents( JobId jobId = request.getJobId(); int fromEventId = request.getFromEventId(); int maxEvents = request.getMaxEvents(); - Job job = verifyAndGetJob(jobId, false); + Job job = verifyAndGetJob(jobId, JobACL.VIEW_JOB); GetTaskAttemptCompletionEventsResponse response = recordFactory.newRecordInstance(GetTaskAttemptCompletionEventsResponse.class); @@ -270,9 +280,11 @@ public GetTaskAttemptCompletionEventsResponse getTaskAttemptCompletionEvents( public KillJobResponse killJob(KillJobRequest request) throws IOException { JobId jobId = request.getJobId(); - String message = "Kill Job received from client " + jobId; + UserGroupInformation callerUGI = UserGroupInformation.getCurrentUser(); + String message = "Kill job " + jobId + " received from " + callerUGI + + " at " + Server.getRemoteAddress(); LOG.info(message); - verifyAndGetJob(jobId, true); + verifyAndGetJob(jobId, JobACL.MODIFY_JOB); appContext.getEventHandler().handle( new JobDiagnosticsUpdateEvent(jobId, message)); appContext.getEventHandler().handle( @@ -287,9 +299,11 @@ public KillJobResponse killJob(KillJobRequest request) public KillTaskResponse killTask(KillTaskRequest request) throws IOException { TaskId taskId = request.getTaskId(); - String message = "Kill task received from client " + taskId; + UserGroupInformation callerUGI = UserGroupInformation.getCurrentUser(); + String message = "Kill task " + taskId + " received from " + callerUGI + + " at " + Server.getRemoteAddress(); LOG.info(message); - verifyAndGetTask(taskId, true); + verifyAndGetTask(taskId, JobACL.MODIFY_JOB); appContext.getEventHandler().handle( new TaskEvent(taskId, TaskEventType.T_KILL)); KillTaskResponse response = @@ -302,9 +316,12 @@ public KillTaskResponse killTask(KillTaskRequest request) public KillTaskAttemptResponse killTaskAttempt( KillTaskAttemptRequest request) throws IOException { TaskAttemptId taskAttemptId = request.getTaskAttemptId(); - String message = "Kill task attempt received from client " + taskAttemptId; + UserGroupInformation callerUGI = UserGroupInformation.getCurrentUser(); + String message = "Kill task attempt " + taskAttemptId + + " received from " + callerUGI + " at " + + Server.getRemoteAddress(); LOG.info(message); - verifyAndGetAttempt(taskAttemptId, true); + verifyAndGetAttempt(taskAttemptId, JobACL.MODIFY_JOB); appContext.getEventHandler().handle( new TaskAttemptDiagnosticsUpdateEvent(taskAttemptId, message)); appContext.getEventHandler().handle( @@ -322,8 +339,8 @@ public GetDiagnosticsResponse getDiagnostics( GetDiagnosticsResponse response = recordFactory.newRecordInstance(GetDiagnosticsResponse.class); - response.addAllDiagnostics( - verifyAndGetAttempt(taskAttemptId, false).getDiagnostics()); + response.addAllDiagnostics(verifyAndGetAttempt(taskAttemptId, + JobACL.VIEW_JOB).getDiagnostics()); return response; } @@ -332,9 +349,12 @@ public GetDiagnosticsResponse getDiagnostics( public FailTaskAttemptResponse failTaskAttempt( FailTaskAttemptRequest request) throws IOException { TaskAttemptId taskAttemptId = request.getTaskAttemptId(); - String message = "Fail task attempt received from client " + taskAttemptId; + UserGroupInformation callerUGI = UserGroupInformation.getCurrentUser(); + String message = "Fail task attempt " + taskAttemptId + + " received from " + callerUGI + " at " + + Server.getRemoteAddress(); LOG.info(message); - verifyAndGetAttempt(taskAttemptId, true); + verifyAndGetAttempt(taskAttemptId, JobACL.MODIFY_JOB); appContext.getEventHandler().handle( new TaskAttemptDiagnosticsUpdateEvent(taskAttemptId, message)); appContext.getEventHandler().handle( @@ -356,7 +376,7 @@ public GetTaskReportsResponse getTaskReports( GetTaskReportsResponse response = recordFactory.newRecordInstance(GetTaskReportsResponse.class); - Job job = verifyAndGetJob(jobId, false); + Job job = verifyAndGetJob(jobId, JobACL.VIEW_JOB); Collection tasks = job.getTasks(taskType).values(); LOG.info("Getting task report for " + taskType + " " + jobId + ". Report-size will be " + tasks.size()); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java index 34b8dc7635..b17b8ce7ad 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java @@ -18,13 +18,20 @@ package org.apache.hadoop.mapreduce.v2.app; +import static org.junit.Assert.fail; + +import java.security.PrivilegedExceptionAction; import java.util.Iterator; import java.util.List; import junit.framework.Assert; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.JobACL; +import org.apache.hadoop.mapreduce.MRConfig; +import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol; +import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest; import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersRequest; import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDiagnosticsRequest; import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest; @@ -32,6 +39,9 @@ import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptReportRequest; import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportRequest; import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportsRequest; +import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillJobRequest; +import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskAttemptRequest; +import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskRequest; import org.apache.hadoop.mapreduce.v2.api.records.AMInfo; import org.apache.hadoop.mapreduce.v2.api.records.JobReport; import org.apache.hadoop.mapreduce.v2.api.records.JobState; @@ -51,6 +61,8 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus; +import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.ipc.YarnRPC; @@ -169,6 +181,79 @@ public void test() throws Exception { app.waitForState(job, JobState.SUCCEEDED); } + @Test + public void testViewAclOnlyCannotModify() throws Exception { + final MRAppWithClientService app = new MRAppWithClientService(1, 0, false); + final Configuration conf = new Configuration(); + conf.setBoolean(MRConfig.MR_ACLS_ENABLED, true); + conf.set(MRJobConfig.JOB_ACL_VIEW_JOB, "viewonlyuser"); + Job job = app.submit(conf); + app.waitForState(job, JobState.RUNNING); + Assert.assertEquals("Num tasks not correct", 1, job.getTasks().size()); + Iterator it = job.getTasks().values().iterator(); + Task task = it.next(); + app.waitForState(task, TaskState.RUNNING); + TaskAttempt attempt = task.getAttempts().values().iterator().next(); + app.waitForState(attempt, TaskAttemptState.RUNNING); + + UserGroupInformation viewOnlyUser = + UserGroupInformation.createUserForTesting( + "viewonlyuser", new String[] {}); + Assert.assertTrue("viewonlyuser cannot view job", + job.checkAccess(viewOnlyUser, JobACL.VIEW_JOB)); + Assert.assertFalse("viewonlyuser can modify job", + job.checkAccess(viewOnlyUser, JobACL.MODIFY_JOB)); + MRClientProtocol client = viewOnlyUser.doAs( + new PrivilegedExceptionAction() { + @Override + public MRClientProtocol run() throws Exception { + YarnRPC rpc = YarnRPC.create(conf); + return (MRClientProtocol) rpc.getProxy(MRClientProtocol.class, + app.clientService.getBindAddress(), conf); + } + }); + + KillJobRequest killJobRequest = recordFactory.newRecordInstance( + KillJobRequest.class); + killJobRequest.setJobId(app.getJobId()); + try { + client.killJob(killJobRequest); + fail("viewonlyuser killed job"); + } catch (AccessControlException e) { + // pass + } + + KillTaskRequest killTaskRequest = recordFactory.newRecordInstance( + KillTaskRequest.class); + killTaskRequest.setTaskId(task.getID()); + try { + client.killTask(killTaskRequest); + fail("viewonlyuser killed task"); + } catch (AccessControlException e) { + // pass + } + + KillTaskAttemptRequest killTaskAttemptRequest = + recordFactory.newRecordInstance(KillTaskAttemptRequest.class); + killTaskAttemptRequest.setTaskAttemptId(attempt.getID()); + try { + client.killTaskAttempt(killTaskAttemptRequest); + fail("viewonlyuser killed task attempt"); + } catch (AccessControlException e) { + // pass + } + + FailTaskAttemptRequest failTaskAttemptRequest = + recordFactory.newRecordInstance(FailTaskAttemptRequest.class); + failTaskAttemptRequest.setTaskAttemptId(attempt.getID()); + try { + client.failTaskAttempt(failTaskAttemptRequest); + fail("viewonlyuser killed task attempt"); + } catch (AccessControlException e) { + // pass + } + } + private void verifyJobReport(JobReport jr) { Assert.assertNotNull("JobReport is null", jr); List amInfos = jr.getAMInfos();