diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 70a2001f38..5b1999ec90 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -211,6 +211,9 @@ Release 0.23.1 - Unreleased MAPREDUCE-3452. fifoscheduler web ui page always shows 0% used for the queue. (Jonathan Eagles via mahadev) + MAPREDUCE-3443. JobClient and Job should function in the context of the + UGI which created them. (Mahadev Konar via sseth) + Release 0.23.0 - 2011-11-01 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java index 0505e33ce5..fc65d46812 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.net.URL; +import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -42,6 +43,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenRenewer; import org.apache.hadoop.security.token.SecretManager.InvalidToken; @@ -421,6 +423,11 @@ boolean monitorAndPrintJob() throws IOException, InterruptedException { } Cluster cluster; + /** + * Ugi of the client. We store this ugi when the client is created and + * then make sure that the same ugi is used to run the various protocols. + */ + UserGroupInformation clientUgi; /** * Create a job client. @@ -458,6 +465,7 @@ public JobClient(Configuration conf) throws IOException { public void init(JobConf conf) throws IOException { setConf(conf); cluster = new Cluster(conf); + clientUgi = UserGroupInformation.getCurrentUser(); } @InterfaceAudience.Private @@ -487,8 +495,7 @@ public void cancel(Token token, Configuration conf @Override public boolean isManaged(Token token) throws IOException { return true; - } - + } } /** @@ -500,6 +507,7 @@ public boolean isManaged(Token token) throws IOException { public JobClient(InetSocketAddress jobTrackAddr, Configuration conf) throws IOException { cluster = new Cluster(jobTrackAddr, conf); + clientUgi = UserGroupInformation.getCurrentUser(); } /** @@ -562,21 +570,34 @@ public RunningJob submitJob(String jobFile) throws FileNotFoundException, * @throws FileNotFoundException * @throws IOException */ - public RunningJob submitJob(JobConf conf) throws FileNotFoundException, + public RunningJob submitJob(final JobConf conf) throws FileNotFoundException, IOException { try { conf.setBooleanIfUnset("mapred.mapper.new-api", false); conf.setBooleanIfUnset("mapred.reducer.new-api", false); - Job job = Job.getInstance(conf); - job.submit(); + Job job = clientUgi.doAs(new PrivilegedExceptionAction () { + @Override + public Job run() throws IOException, ClassNotFoundException, + InterruptedException { + Job job = Job.getInstance(conf); + job.submit(); + return job; + } + }); return new NetworkedJob(job); } catch (InterruptedException ie) { throw new IOException("interrupted", ie); - } catch (ClassNotFoundException cnfe) { - throw new IOException("class not found", cnfe); } } + private Job getJobUsingCluster(final JobID jobid) throws IOException, + InterruptedException { + return clientUgi.doAs(new PrivilegedExceptionAction() { + public Job run() throws IOException, InterruptedException { + return cluster.getJob(jobid); + } + }); + } /** * Get an {@link RunningJob} object to track an ongoing job. Returns * null if the id does not correspond to any known job. @@ -586,9 +607,10 @@ public RunningJob submitJob(JobConf conf) throws FileNotFoundException, * jobid doesn't correspond to any known job. * @throws IOException */ - public RunningJob getJob(JobID jobid) throws IOException { + public RunningJob getJob(final JobID jobid) throws IOException { try { - Job job = cluster.getJob(jobid); + + Job job = getJobUsingCluster(jobid); if (job != null) { JobStatus status = JobStatus.downgrade(job.getStatus()); if (status != null) { @@ -621,9 +643,10 @@ public TaskReport[] getMapTaskReports(JobID jobId) throws IOException { return getTaskReports(jobId, TaskType.MAP); } - private TaskReport[] getTaskReports(JobID jobId, TaskType type) throws IOException { + private TaskReport[] getTaskReports(final JobID jobId, TaskType type) throws + IOException { try { - Job j = cluster.getJob(jobId); + Job j = getJobUsingCluster(jobId); if(j == null) { return EMPTY_TASK_REPORTS; } @@ -688,10 +711,11 @@ public TaskReport[] getReduceTaskReports(String jobId) throws IOException { * @param state the state of the task * (pending/running/completed/failed/killed) */ - public void displayTasks(JobID jobId, String type, String state) + public void displayTasks(final JobID jobId, String type, String state) throws IOException { try { - super.displayTasks(cluster.getJob(jobId), type, state); + Job job = getJobUsingCluster(jobId); + super.displayTasks(job, type, state); } catch (InterruptedException ie) { throw new IOException(ie); } @@ -706,15 +730,20 @@ public void displayTasks(JobID jobId, String type, String state) */ public ClusterStatus getClusterStatus() throws IOException { try { - ClusterMetrics metrics = cluster.getClusterStatus(); - return new ClusterStatus(metrics.getTaskTrackerCount(), - metrics.getBlackListedTaskTrackerCount(), cluster.getTaskTrackerExpiryInterval(), - metrics.getOccupiedMapSlots(), - metrics.getOccupiedReduceSlots(), metrics.getMapSlotCapacity(), - metrics.getReduceSlotCapacity(), - cluster.getJobTrackerStatus(), - metrics.getDecommissionedTaskTrackerCount()); - } catch (InterruptedException ie) { + return clientUgi.doAs(new PrivilegedExceptionAction() { + public ClusterStatus run() throws IOException, InterruptedException { + ClusterMetrics metrics = cluster.getClusterStatus(); + return new ClusterStatus(metrics.getTaskTrackerCount(), + metrics.getBlackListedTaskTrackerCount(), cluster.getTaskTrackerExpiryInterval(), + metrics.getOccupiedMapSlots(), + metrics.getOccupiedReduceSlots(), metrics.getMapSlotCapacity(), + metrics.getReduceSlotCapacity(), + cluster.getJobTrackerStatus(), + metrics.getDecommissionedTaskTrackerCount()); + } + }); + } + catch (InterruptedException ie) { throw new IOException(ie); } } @@ -750,13 +779,17 @@ private Collection arrayToBlackListInfo(TaskTrackerInfo[] objs) */ public ClusterStatus getClusterStatus(boolean detailed) throws IOException { try { - ClusterMetrics metrics = cluster.getClusterStatus(); - return new ClusterStatus(arrayToStringList(cluster.getActiveTaskTrackers()), - arrayToBlackListInfo(cluster.getBlackListedTaskTrackers()), - cluster.getTaskTrackerExpiryInterval(), metrics.getOccupiedMapSlots(), - metrics.getOccupiedReduceSlots(), metrics.getMapSlotCapacity(), - metrics.getReduceSlotCapacity(), - cluster.getJobTrackerStatus()); + return clientUgi.doAs(new PrivilegedExceptionAction() { + public ClusterStatus run() throws IOException, InterruptedException { + ClusterMetrics metrics = cluster.getClusterStatus(); + return new ClusterStatus(arrayToStringList(cluster.getActiveTaskTrackers()), + arrayToBlackListInfo(cluster.getBlackListedTaskTrackers()), + cluster.getTaskTrackerExpiryInterval(), metrics.getOccupiedMapSlots(), + metrics.getOccupiedReduceSlots(), metrics.getMapSlotCapacity(), + metrics.getReduceSlotCapacity(), + cluster.getJobTrackerStatus()); + } + }); } catch (InterruptedException ie) { throw new IOException(ie); } @@ -787,7 +820,14 @@ public JobStatus[] jobsToComplete() throws IOException { */ public JobStatus[] getAllJobs() throws IOException { try { - org.apache.hadoop.mapreduce.JobStatus[] jobs = cluster.getAllJobStatuses(); + org.apache.hadoop.mapreduce.JobStatus[] jobs = + clientUgi.doAs(new PrivilegedExceptionAction< + org.apache.hadoop.mapreduce.JobStatus[]> () { + public org.apache.hadoop.mapreduce.JobStatus[] run() + throws IOException, InterruptedException { + return cluster.getAllJobStatuses(); + } + }); JobStatus[] stats = new JobStatus[jobs.length]; for (int i = 0; i < jobs.length; i++) { stats[i] = JobStatus.downgrade(jobs[i]); @@ -909,7 +949,12 @@ protected long getCounter(org.apache.hadoop.mapreduce.Counters cntrs, */ public int getDefaultMaps() throws IOException { try { - return cluster.getClusterStatus().getMapSlotCapacity(); + return clientUgi.doAs(new PrivilegedExceptionAction() { + @Override + public Integer run() throws IOException, InterruptedException { + return cluster.getClusterStatus().getMapSlotCapacity(); + } + }); } catch (InterruptedException ie) { throw new IOException(ie); } @@ -923,7 +968,12 @@ public int getDefaultMaps() throws IOException { */ public int getDefaultReduces() throws IOException { try { - return cluster.getClusterStatus().getReduceSlotCapacity(); + return clientUgi.doAs(new PrivilegedExceptionAction() { + @Override + public Integer run() throws IOException, InterruptedException { + return cluster.getClusterStatus().getReduceSlotCapacity(); + } + }); } catch (InterruptedException ie) { throw new IOException(ie); } @@ -936,8 +986,13 @@ public int getDefaultReduces() throws IOException { */ public Path getSystemDir() { try { - return cluster.getSystemDir(); - } catch (IOException ioe) { + return clientUgi.doAs(new PrivilegedExceptionAction() { + @Override + public Path run() throws IOException, InterruptedException { + return cluster.getSystemDir(); + } + }); + } catch (IOException ioe) { return null; } catch (InterruptedException ie) { return null; @@ -962,7 +1017,11 @@ private JobQueueInfo[] getJobQueueInfoArray(QueueInfo[] queues) */ public JobQueueInfo[] getRootQueues() throws IOException { try { - return getJobQueueInfoArray(cluster.getRootQueues()); + return clientUgi.doAs(new PrivilegedExceptionAction() { + public JobQueueInfo[] run() throws IOException, InterruptedException { + return getJobQueueInfoArray(cluster.getRootQueues()); + } + }); } catch (InterruptedException ie) { throw new IOException(ie); } @@ -976,9 +1035,13 @@ public JobQueueInfo[] getRootQueues() throws IOException { * @return the array of immediate children JobQueueInfo objects * @throws IOException */ - public JobQueueInfo[] getChildQueues(String queueName) throws IOException { + public JobQueueInfo[] getChildQueues(final String queueName) throws IOException { try { - return getJobQueueInfoArray(cluster.getChildQueues(queueName)); + return clientUgi.doAs(new PrivilegedExceptionAction() { + public JobQueueInfo[] run() throws IOException, InterruptedException { + return getJobQueueInfoArray(cluster.getChildQueues(queueName)); + } + }); } catch (InterruptedException ie) { throw new IOException(ie); } @@ -993,7 +1056,11 @@ public JobQueueInfo[] getChildQueues(String queueName) throws IOException { */ public JobQueueInfo[] getQueues() throws IOException { try { - return getJobQueueInfoArray(cluster.getQueues()); + return clientUgi.doAs(new PrivilegedExceptionAction() { + public JobQueueInfo[] run() throws IOException, InterruptedException { + return getJobQueueInfoArray(cluster.getQueues()); + } + }); } catch (InterruptedException ie) { throw new IOException(ie); } @@ -1007,9 +1074,14 @@ public JobQueueInfo[] getQueues() throws IOException { * @throws IOException */ - public JobStatus[] getJobsFromQueue(String queueName) throws IOException { + public JobStatus[] getJobsFromQueue(final String queueName) throws IOException { try { - QueueInfo queue = cluster.getQueue(queueName); + QueueInfo queue = clientUgi.doAs(new PrivilegedExceptionAction() { + @Override + public QueueInfo run() throws IOException, InterruptedException { + return cluster.getQueue(queueName); + } + }); if (queue == null) { return null; } @@ -1032,9 +1104,14 @@ public JobStatus[] getJobsFromQueue(String queueName) throws IOException { * @return Queue information associated to particular queue. * @throws IOException */ - public JobQueueInfo getQueueInfo(String queueName) throws IOException { + public JobQueueInfo getQueueInfo(final String queueName) throws IOException { try { - QueueInfo queueInfo = cluster.getQueue(queueName); + QueueInfo queueInfo = clientUgi.doAs(new + PrivilegedExceptionAction() { + public QueueInfo run() throws IOException, InterruptedException { + return cluster.getQueue(queueName); + } + }); if (queueInfo != null) { return new JobQueueInfo(queueInfo); } @@ -1052,7 +1129,14 @@ public JobQueueInfo getQueueInfo(String queueName) throws IOException { public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException { try { org.apache.hadoop.mapreduce.QueueAclsInfo[] acls = - cluster.getQueueAclsForCurrentUser(); + clientUgi.doAs(new + PrivilegedExceptionAction + () { + public org.apache.hadoop.mapreduce.QueueAclsInfo[] run() + throws IOException, InterruptedException { + return cluster.getQueueAclsForCurrentUser(); + } + }); QueueAclsInfo[] ret = new QueueAclsInfo[acls.length]; for (int i = 0 ; i < acls.length; i++ ) { ret[i] = QueueAclsInfo.downgrade(acls[i]); @@ -1070,8 +1154,14 @@ public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException { * @throws IOException */ public Token - getDelegationToken(Text renewer) throws IOException, InterruptedException { - return cluster.getDelegationToken(renewer); + getDelegationToken(final Text renewer) throws IOException, InterruptedException { + return clientUgi.doAs(new + PrivilegedExceptionAction>() { + public Token run() throws IOException, + InterruptedException { + return cluster.getDelegationToken(renewer); + } + }); } /** diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java index 530aee1999..c6b5adeb43 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java @@ -30,6 +30,7 @@ import java.net.URLConnection; import java.net.URI; import java.security.PrivilegedExceptionAction; +import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -315,7 +316,12 @@ synchronized void ensureFreshStatus() * @throws IOException */ synchronized void updateStatus() throws IOException, InterruptedException { - this.status = cluster.getClient().getJobStatus(status.getJobID()); + this.status = ugi.doAs(new PrivilegedExceptionAction() { + @Override + public JobStatus run() throws IOException, InterruptedException { + return cluster.getClient().getJobStatus(status.getJobID()); + } + }); if (this.status == null) { throw new IOException("Job status not available "); } @@ -476,8 +482,16 @@ String getTaskFailureEventString() throws IOException, InterruptedException { int failCount = 1; TaskCompletionEvent lastEvent = null; - for (TaskCompletionEvent event : cluster.getClient().getTaskCompletionEvents( - status.getJobID(), 0, 10)) { + TaskCompletionEvent[] events = ugi.doAs(new + PrivilegedExceptionAction() { + @Override + public TaskCompletionEvent[] run() throws IOException, + InterruptedException { + return cluster.getClient().getTaskCompletionEvents( + status.getJobID(), 0, 10); + } + }); + for (TaskCompletionEvent event : events) { if (event.getStatus().equals(TaskCompletionEvent.Status.FAILED)) { failCount++; lastEvent = event; @@ -500,7 +514,12 @@ String getTaskFailureEventString() throws IOException, public TaskReport[] getTaskReports(TaskType type) throws IOException, InterruptedException { ensureState(JobState.RUNNING); - return cluster.getClient().getTaskReports(getJobID(), type); + final TaskType tmpType = type; + return ugi.doAs(new PrivilegedExceptionAction() { + public TaskReport[] run() throws IOException, InterruptedException { + return cluster.getClient().getTaskReports(getJobID(), tmpType); + } + }); } /** @@ -603,7 +622,14 @@ public void setPriority(JobPriority priority) org.apache.hadoop.mapred.JobPriority.valueOf(priority.name())); } else { ensureState(JobState.RUNNING); - cluster.getClient().setJobPriority(getJobID(), priority.toString()); + final JobPriority tmpPriority = priority; + ugi.doAs(new PrivilegedExceptionAction() { + @Override + public Object run() throws IOException, InterruptedException { + cluster.getClient().setJobPriority(getJobID(), tmpPriority.toString()); + return null; + } + }); } } @@ -615,12 +641,17 @@ public void setPriority(JobPriority priority) * @return an array of {@link TaskCompletionEvent}s * @throws IOException */ - public TaskCompletionEvent[] getTaskCompletionEvents(int startFrom, - int numEvents) throws IOException, InterruptedException { + public TaskCompletionEvent[] getTaskCompletionEvents(final int startFrom, + final int numEvents) throws IOException, InterruptedException { ensureState(JobState.RUNNING); - return cluster.getClient().getTaskCompletionEvents(getJobID(), - startFrom, numEvents); - } + return ugi.doAs(new PrivilegedExceptionAction() { + @Override + public TaskCompletionEvent[] run() throws IOException, InterruptedException { + return cluster.getClient().getTaskCompletionEvents(getJobID(), + startFrom, numEvents); + } + }); + } /** * Kill indicated task attempt. @@ -628,10 +659,14 @@ public TaskCompletionEvent[] getTaskCompletionEvents(int startFrom, * @param taskId the id of the task to be terminated. * @throws IOException */ - public boolean killTask(TaskAttemptID taskId) + public boolean killTask(final TaskAttemptID taskId) throws IOException, InterruptedException { ensureState(JobState.RUNNING); - return cluster.getClient().killTask(taskId, false); + return ugi.doAs(new PrivilegedExceptionAction() { + public Boolean run() throws IOException, InterruptedException { + return cluster.getClient().killTask(taskId, false); + } + }); } /** @@ -640,10 +675,15 @@ public boolean killTask(TaskAttemptID taskId) * @param taskId the id of the task to be terminated. * @throws IOException */ - public boolean failTask(TaskAttemptID taskId) + public boolean failTask(final TaskAttemptID taskId) throws IOException, InterruptedException { ensureState(JobState.RUNNING); - return cluster.getClient().killTask(taskId, true); + return ugi.doAs(new PrivilegedExceptionAction() { + @Override + public Boolean run() throws IOException, InterruptedException { + return cluster.getClient().killTask(taskId, true); + } + }); } /** @@ -656,7 +696,12 @@ public boolean failTask(TaskAttemptID taskId) public Counters getCounters() throws IOException, InterruptedException { ensureState(JobState.RUNNING); - return cluster.getClient().getJobCounters(getJobID()); + return ugi.doAs(new PrivilegedExceptionAction() { + @Override + public Counters run() throws IOException, InterruptedException { + return cluster.getClient().getJobCounters(getJobID()); + } + }); } /** @@ -665,10 +710,15 @@ public Counters getCounters() * @return the list of diagnostic messages for the task * @throws IOException */ - public String[] getTaskDiagnostics(TaskAttemptID taskid) + public String[] getTaskDiagnostics(final TaskAttemptID taskid) throws IOException, InterruptedException { ensureState(JobState.RUNNING); - return cluster.getClient().getTaskDiagnostics(taskid); + return ugi.doAs(new PrivilegedExceptionAction() { + @Override + public String[] run() throws IOException, InterruptedException { + return cluster.getClient().getTaskDiagnostics(taskid); + } + }); } /** diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java index 99b7e8826e..71ea84bb8c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java @@ -22,7 +22,7 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.net.InetSocketAddress; -import java.security.PrivilegedAction; +import java.security.PrivilegedExceptionAction; import java.util.EnumSet; import java.util.HashMap; import java.util.List; @@ -156,6 +156,8 @@ private MRClientProtocol getProxy() throws YarnRemoteException { application = rm.getApplicationReport(appId); continue; } + UserGroupInformation newUgi = UserGroupInformation.createRemoteUser( + UserGroupInformation.getCurrentUser().getUserName()); serviceAddr = application.getHost() + ":" + application.getRpcPort(); if (UserGroupInformation.isSecurityEnabled()) { String clientTokenEncoded = application.getClientToken(); @@ -167,11 +169,17 @@ private MRClientProtocol getProxy() throws YarnRemoteException { .getHost(), application.getRpcPort()); clientToken.setService(new Text(addr.getAddress().getHostAddress() + ":" + addr.getPort())); - UserGroupInformation.getCurrentUser().addToken(clientToken); + newUgi.addToken(clientToken); } LOG.info("The url to track the job: " + application.getTrackingUrl()); LOG.debug("Connecting to " + serviceAddr); - realProxy = instantiateAMProxy(serviceAddr); + final String tempStr = serviceAddr; + realProxy = newUgi.doAs(new PrivilegedExceptionAction() { + @Override + public MRClientProtocol run() throws IOException { + return instantiateAMProxy(tempStr); + } + }); return realProxy; } catch (IOException e) { //possibly the AM has crashed @@ -243,17 +251,11 @@ private MRClientProtocol checkAndGetHSProxy( MRClientProtocol instantiateAMProxy(final String serviceAddr) throws IOException { - UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); LOG.trace("Connecting to ApplicationMaster at: " + serviceAddr); - MRClientProtocol proxy = currentUser - .doAs(new PrivilegedAction() { - @Override - public MRClientProtocol run() { - YarnRPC rpc = YarnRPC.create(conf); - return (MRClientProtocol) rpc.getProxy(MRClientProtocol.class, + YarnRPC rpc = YarnRPC.create(conf); + MRClientProtocol proxy = + (MRClientProtocol) rpc.getProxy(MRClientProtocol.class, NetUtils.createSocketAddr(serviceAddr), conf); - } - }); LOG.trace("Connected to ApplicationMaster at: " + serviceAddr); return proxy; }