From 7c8fcbecf14b2e24d54ccb276bb684fdbe62b669 Mon Sep 17 00:00:00 2001 From: Mahadev Konar <mahadev@apache.org> Date: Mon, 22 Aug 2011 19:36:40 +0000 Subject: [PATCH] MAPREDUCE-2701. app/Job.java needs UGI for the user that launched it. (Robert Evans via mahadev) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1160392 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce/CHANGES.txt | 3 ++ .../hadoop/mapreduce/v2/app/MRAppMaster.java | 28 +++++++++-------- .../hadoop/mapreduce/v2/app/job/Job.java | 1 + .../mapreduce/v2/app/job/impl/JobImpl.java | 11 +++++-- .../apache/hadoop/mapreduce/v2/app/MRApp.java | 12 +++++--- .../hadoop/mapreduce/v2/app/MockJobs.java | 5 ++++ .../hadoop/mapreduce/v2/app/TestMRApp.java | 1 + .../v2/app/TestRuntimeEstimators.java | 5 ++++ .../hadoop/mapreduce/v2/hs/CompletedJob.java | 10 ++++++- .../hadoop/mapreduce/v2/hs/JobHistory.java | 3 +- .../hadoop/mapreduce/v2/hs/PartialJob.java | 5 ++++ .../mapreduce/v2/hs/TestJobHistoryEvents.java | 2 +- .../v2/hs/TestJobHistoryParsing.java | 30 +++++++++---------- 13 files changed, 80 insertions(+), 36 deletions(-) diff --git a/hadoop-mapreduce/CHANGES.txt b/hadoop-mapreduce/CHANGES.txt index bb5465005e..8a8335c51b 100644 --- a/hadoop-mapreduce/CHANGES.txt +++ b/hadoop-mapreduce/CHANGES.txt @@ -221,6 +221,9 @@ Trunk (unreleased changes) MAPREDUCE-2854. update INSTALL with config necessary run mapred on yarn. (thomas graves via mahadev) + MAPREDUCE-2701. app/Job.java needs UGI for the user that launched it. + (Robert Evans via mahadev) + OPTIMIZATIONS MAPREDUCE-2026. Make JobTracker.getJobCounters() and diff --git a/hadoop-mapreduce/hadoop-mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce/hadoop-mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index 0da89333e4..7e925d58be 100644 --- a/hadoop-mapreduce/hadoop-mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce/hadoop-mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -206,30 +206,33 @@ public class MRAppMaster extends CompositeService { new SpeculatorEventDispatcher()); Credentials fsTokens = new Credentials(); - if (UserGroupInformation.isSecurityEnabled()) { - // Read the file-system tokens from the localized tokens-file. - try { - Path jobSubmitDir = + + UserGroupInformation currentUser = null; + + try { + currentUser = UserGroupInformation.getCurrentUser(); + + if (UserGroupInformation.isSecurityEnabled()) { + // Read the file-system tokens from the localized tokens-file. + Path jobSubmitDir = FileContext.getLocalFSFileContext().makeQualified( new Path(new File(MRConstants.JOB_SUBMIT_DIR) .getAbsolutePath())); - Path jobTokenFile = + Path jobTokenFile = new Path(jobSubmitDir, MRConstants.APPLICATION_TOKENS_FILE); fsTokens.addAll(Credentials.readTokenStorageFile(jobTokenFile, conf)); LOG.info("jobSubmitDir=" + jobSubmitDir + " jobTokenFile=" + jobTokenFile); - UserGroupInformation currentUser = - UserGroupInformation.getCurrentUser(); for (Token<? extends TokenIdentifier> tk : fsTokens.getAllTokens()) { LOG.info(" --- DEBUG: Token of kind " + tk.getKind() + "in current ugi in the AppMaster for service " + tk.getService()); currentUser.addToken(tk); // For use by AppMaster itself. } - } catch (IOException e) { - throw new YarnException(e); } + } catch (IOException e) { + throw new YarnException(e); } super.init(conf); @@ -238,7 +241,7 @@ public class MRAppMaster extends CompositeService { Configuration config = getConfig(); - job = createJob(config, fsTokens); + job = createJob(config, fsTokens, currentUser.getUserName()); /** create a job event for job intialization */ JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT); @@ -284,12 +287,13 @@ public class MRAppMaster extends CompositeService { /** Create and initialize (but don't start) a single job. * @param fsTokens */ - protected Job createJob(Configuration conf, Credentials fsTokens) { + protected Job createJob(Configuration conf, Credentials fsTokens, + String user) { // create single job Job newJob = new JobImpl(appID, conf, dispatcher.getEventHandler(), taskAttemptListener, jobTokenSecretManager, fsTokens, clock, startCount, - completedTasksFromPreviousRun, metrics); + completedTasksFromPreviousRun, metrics, user); ((RunningAppContext) context).jobs.put(newJob.getID(), newJob); dispatcher.register(JobFinishEvent.Type.class, diff --git a/hadoop-mapreduce/hadoop-mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java b/hadoop-mapreduce/hadoop-mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java index 1be08ab1ae..15d2f4bb28 100644 --- a/hadoop-mapreduce/hadoop-mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java +++ b/hadoop-mapreduce/hadoop-mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java @@ -51,6 +51,7 @@ public interface Job { int getCompletedMaps(); int getCompletedReduces(); boolean isUber(); + String getUserName(); TaskAttemptCompletionEvent[] getTaskAttemptCompletionEvents(int fromEventId, int maxEvents); diff --git a/hadoop-mapreduce/hadoop-mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce/hadoop-mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java index 0416d3de10..302fd93751 100644 --- a/hadoop-mapreduce/hadoop-mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java +++ b/hadoop-mapreduce/hadoop-mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java @@ -146,6 +146,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, private final Set<TaskId> reduceTasks = new LinkedHashSet<TaskId>(); private final EventHandler eventHandler; private final MRAppMetrics metrics; + private final String userName; private boolean lazyTasksCopyNeeded = false; private volatile Map<TaskId, Task> tasks = new LinkedHashMap<TaskId, Task>(); @@ -368,7 +369,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, EventHandler eventHandler, TaskAttemptListener taskAttemptListener, JobTokenSecretManager jobTokenSecretManager, Credentials fsTokenCredentials, Clock clock, int startCount, - Set<TaskId> completedTasksFromPreviousRun, MRAppMetrics metrics) { + Set<TaskId> completedTasksFromPreviousRun, MRAppMetrics metrics, + String userName) { this.jobId = recordFactory.newRecordInstance(JobId.class); this.jobName = conf.get(JobContext.JOB_NAME, "<missing job name>"); @@ -377,6 +379,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, this.clock = clock; this.completedTasksFromPreviousRun = completedTasksFromPreviousRun; this.startCount = startCount; + this.userName = userName; jobId.setAppId(appID); jobId.setId(appID.getId()); oldJobId = TypeConverter.fromYarn(jobId); @@ -762,6 +765,11 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, return finalState; } + @Override + public String getUserName() { + return userName; + } + @Override public String getName() { return jobName; @@ -1412,5 +1420,4 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, job.finished(JobState.ERROR); } } - } diff --git a/hadoop-mapreduce/hadoop-mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java b/hadoop-mapreduce/hadoop-mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java index 57f95dcc0c..a47ba3cfb8 100644 --- a/hadoop-mapreduce/hadoop-mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java +++ b/hadoop-mapreduce/hadoop-mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java @@ -64,6 +64,7 @@ import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleanupEvent; import org.apache.hadoop.mapreduce.v2.jobhistory.JHConfig; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.Clock; import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -235,9 +236,11 @@ public class MRApp extends MRAppMaster { } @Override - protected Job createJob(Configuration conf, Credentials fsTokens) { + protected Job createJob(Configuration conf, Credentials fsTokens, + String user) { Job newJob = new TestJob(getAppID(), getDispatcher().getEventHandler(), - getTaskAttemptListener(), getContext().getClock()); + getTaskAttemptListener(), getContext().getClock(), + user); ((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob); getDispatcher().register(JobFinishEvent.Type.class, @@ -382,10 +385,11 @@ public class MRApp extends MRAppMaster { } public TestJob(ApplicationId appID, EventHandler eventHandler, - TaskAttemptListener taskAttemptListener, Clock clock) { + TaskAttemptListener taskAttemptListener, Clock clock, + String user) { super(appID, new Configuration(), eventHandler, taskAttemptListener, new JobTokenSecretManager(), new Credentials(), clock, getStartCount(), - getCompletedTaskFromPreviousRun(), metrics); + getCompletedTaskFromPreviousRun(), metrics, user); // This "this leak" is okay because the retained pointer is in an // instance variable. diff --git a/hadoop-mapreduce/hadoop-mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java b/hadoop-mapreduce/hadoop-mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java index dc2437329b..321dd1d22f 100644 --- a/hadoop-mapreduce/hadoop-mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java +++ b/hadoop-mapreduce/hadoop-mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java @@ -448,6 +448,11 @@ public class MockJobs extends MockApps { JobACL jobOperation) { return true; } + + @Override + public String getUserName() { + throw new UnsupportedOperationException("Not supported yet."); + } }; } } diff --git a/hadoop-mapreduce/hadoop-mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java b/hadoop-mapreduce/hadoop-mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java index c64335acf5..d1dd7203d1 100644 --- a/hadoop-mapreduce/hadoop-mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java +++ b/hadoop-mapreduce/hadoop-mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java @@ -48,6 +48,7 @@ public class TestMRApp { Job job = app.submit(new Configuration()); app.waitForState(job, JobState.SUCCEEDED); app.verifyCompleted(); + Assert.assertEquals(System.getProperty("user.name"),job.getUserName()); } @Test diff --git a/hadoop-mapreduce/hadoop-mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java b/hadoop-mapreduce/hadoop-mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java index a359fd3bec..7abf435ed0 100644 --- a/hadoop-mapreduce/hadoop-mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java +++ b/hadoop-mapreduce/hadoop-mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java @@ -456,6 +456,11 @@ public class TestRuntimeEstimators { JobACL jobOperation) { return true; } + + @Override + public String getUserName() { + throw new UnsupportedOperationException("Not supported yet."); + } } /* diff --git a/hadoop-mapreduce/hadoop-mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java b/hadoop-mapreduce/hadoop-mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java index abb99784e2..0f1b08547b 100644 --- a/hadoop-mapreduce/hadoop-mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java +++ b/hadoop-mapreduce/hadoop-mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java @@ -69,17 +69,20 @@ public class CompletedJob implements org.apache.hadoop.mapreduce.v2.app.job.Job private final Map<TaskId, Task> tasks = new HashMap<TaskId, Task>(); private final Map<TaskId, Task> mapTasks = new HashMap<TaskId, Task>(); private final Map<TaskId, Task> reduceTasks = new HashMap<TaskId, Task>(); + private final String user; private List<TaskAttemptCompletionEvent> completionEvents = null; private JobInfo jobInfo; - public CompletedJob(Configuration conf, JobId jobId, Path historyFile, boolean loadTasks) throws IOException { + public CompletedJob(Configuration conf, JobId jobId, Path historyFile, + boolean loadTasks, String userName) throws IOException { LOG.info("Loading job: " + jobId + " from file: " + historyFile); this.conf = conf; this.jobId = jobId; loadFullHistoryData(loadTasks, historyFile); + user = userName; counters = TypeConverter.toYarn(jobInfo.getTotalCounters()); diagnostics.add(jobInfo.getErrorInfo()); report = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(JobReport.class); @@ -297,4 +300,9 @@ public class CompletedJob implements org.apache.hadoop.mapreduce.v2.app.job.Job return aclsMgr.checkAccess(callerUGI, jobOperation, jobInfo.getUsername(), jobACL); } + + @Override + public String getUserName() { + return user; + } } diff --git a/hadoop-mapreduce/hadoop-mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java b/hadoop-mapreduce/hadoop-mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java index 59635f225e..389de0bd70 100644 --- a/hadoop-mapreduce/hadoop-mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java +++ b/hadoop-mapreduce/hadoop-mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java @@ -612,7 +612,8 @@ public class JobHistory extends AbstractService implements HistoryContext { private Job loadJob(MetaInfo metaInfo) { synchronized(metaInfo) { try { - Job job = new CompletedJob(conf, metaInfo.getJobIndexInfo().getJobId(), metaInfo.getHistoryFile(), true); + Job job = new CompletedJob(conf, metaInfo.getJobIndexInfo().getJobId(), + metaInfo.getHistoryFile(), true, metaInfo.getJobIndexInfo().getUser()); addToLoadedJobCache(job); return job; } catch (IOException e) { diff --git a/hadoop-mapreduce/hadoop-mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java b/hadoop-mapreduce/hadoop-mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java index f0393e26ea..b90bd338b3 100644 --- a/hadoop-mapreduce/hadoop-mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java +++ b/hadoop-mapreduce/hadoop-mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java @@ -139,5 +139,10 @@ public class PartialJob implements org.apache.hadoop.mapreduce.v2.app.job.Job { public boolean checkAccess(UserGroupInformation callerUGI, JobACL jobOperation) { return false; } + + @Override + public String getUserName() { + return null; + } } diff --git a/hadoop-mapreduce/hadoop-mr-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java b/hadoop-mapreduce/hadoop-mr-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java index de4a087c9d..634c596ff5 100644 --- a/hadoop-mapreduce/hadoop-mr-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java +++ b/hadoop-mapreduce/hadoop-mr-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java @@ -70,7 +70,7 @@ public class TestJobHistoryEvents { Job parsedJob = context.getJob(jobId); Assert.assertEquals("CompletedMaps not correct", 2, parsedJob.getCompletedMaps()); - + Assert.assertEquals(System.getProperty("user.name"), parsedJob.getUserName()); Map<TaskId, Task> tasks = parsedJob.getTasks(); Assert.assertEquals("No of tasks not correct", 3, tasks.size()); diff --git a/hadoop-mapreduce/hadoop-mr-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java b/hadoop-mapreduce/hadoop-mr-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java index bc85066db9..6a5a57fa1c 100644 --- a/hadoop-mapreduce/hadoop-mr-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java +++ b/hadoop-mapreduce/hadoop-mr-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java @@ -83,26 +83,26 @@ public class TestJobHistoryParsing { JobHistoryParser parser = new JobHistoryParser(in); JobInfo jobInfo = parser.parse(); - Assert.assertTrue ("Incorrect username ", - jobInfo.getUsername().equals("mapred")); - Assert.assertTrue("Incorrect jobName ", - jobInfo.getJobname().equals("test")); - Assert.assertTrue("Incorrect queuename ", - jobInfo.getJobQueueName().equals("default")); - Assert.assertTrue("incorrect conf path", - jobInfo.getJobConfPath().equals("test")); - Assert.assertTrue("incorrect finishedMap ", - jobInfo.getFinishedMaps() == 2); - Assert.assertTrue("incorrect finishedReduces ", - jobInfo.getFinishedReduces() == 1); + Assert.assertEquals ("Incorrect username ", + "mapred", jobInfo.getUsername()); + Assert.assertEquals("Incorrect jobName ", + "test", jobInfo.getJobname()); + Assert.assertEquals("Incorrect queuename ", + "default", jobInfo.getJobQueueName()); + Assert.assertEquals("incorrect conf path", + "test", jobInfo.getJobConfPath()); + Assert.assertEquals("incorrect finishedMap ", + 2, jobInfo.getFinishedMaps()); + Assert.assertEquals("incorrect finishedReduces ", + 1, jobInfo.getFinishedReduces()); int totalTasks = jobInfo.getAllTasks().size(); - Assert.assertTrue("total number of tasks is incorrect ", totalTasks == 3); + Assert.assertEquals("total number of tasks is incorrect ", 3, totalTasks); //Assert at taskAttempt level for (TaskInfo taskInfo : jobInfo.getAllTasks().values()) { int taskAttemptCount = taskInfo.getAllTaskAttempts().size(); - Assert.assertTrue("total number of task attempts ", - taskAttemptCount == 1); + Assert.assertEquals("total number of task attempts ", + 1, taskAttemptCount); } String summaryFileName = JobHistoryUtils