MAPREDUCE-2701. app/Job.java needs UGI for the user that launched it. (Robert Evans via mahadev)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1160392 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mahadev Konar 2011-08-22 19:36:40 +00:00
parent 14b97a91d9
commit 7c8fcbecf1
13 changed files with 80 additions and 36 deletions

View File

@ -221,6 +221,9 @@ Trunk (unreleased changes)
MAPREDUCE-2854. update INSTALL with config necessary run mapred on yarn.
(thomas graves via mahadev)
MAPREDUCE-2701. app/Job.java needs UGI for the user that launched it.
(Robert Evans via mahadev)
OPTIMIZATIONS
MAPREDUCE-2026. Make JobTracker.getJobCounters() and

View File

@ -206,30 +206,33 @@ public void init(final Configuration conf) {
new SpeculatorEventDispatcher());
Credentials fsTokens = new Credentials();
if (UserGroupInformation.isSecurityEnabled()) {
// Read the file-system tokens from the localized tokens-file.
try {
Path jobSubmitDir =
UserGroupInformation currentUser = null;
try {
currentUser = UserGroupInformation.getCurrentUser();
if (UserGroupInformation.isSecurityEnabled()) {
// Read the file-system tokens from the localized tokens-file.
Path jobSubmitDir =
FileContext.getLocalFSFileContext().makeQualified(
new Path(new File(MRConstants.JOB_SUBMIT_DIR)
.getAbsolutePath()));
Path jobTokenFile =
Path jobTokenFile =
new Path(jobSubmitDir, MRConstants.APPLICATION_TOKENS_FILE);
fsTokens.addAll(Credentials.readTokenStorageFile(jobTokenFile, conf));
LOG.info("jobSubmitDir=" + jobSubmitDir + " jobTokenFile="
+ jobTokenFile);
UserGroupInformation currentUser =
UserGroupInformation.getCurrentUser();
for (Token<? extends TokenIdentifier> tk : fsTokens.getAllTokens()) {
LOG.info(" --- DEBUG: Token of kind " + tk.getKind()
+ "in current ugi in the AppMaster for service "
+ tk.getService());
currentUser.addToken(tk); // For use by AppMaster itself.
}
} catch (IOException e) {
throw new YarnException(e);
}
} catch (IOException e) {
throw new YarnException(e);
}
super.init(conf);
@ -238,7 +241,7 @@ public void init(final Configuration conf) {
Configuration config = getConfig();
job = createJob(config, fsTokens);
job = createJob(config, fsTokens, currentUser.getUserName());
/** create a job event for job intialization */
JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT);
@ -284,12 +287,13 @@ public void init(final Configuration conf) {
/** Create and initialize (but don't start) a single job.
* @param fsTokens */
protected Job createJob(Configuration conf, Credentials fsTokens) {
protected Job createJob(Configuration conf, Credentials fsTokens,
String user) {
// create single job
Job newJob = new JobImpl(appID, conf, dispatcher.getEventHandler(),
taskAttemptListener, jobTokenSecretManager, fsTokens, clock, startCount,
completedTasksFromPreviousRun, metrics);
completedTasksFromPreviousRun, metrics, user);
((RunningAppContext) context).jobs.put(newJob.getID(), newJob);
dispatcher.register(JobFinishEvent.Type.class,

View File

@ -51,6 +51,7 @@ public interface Job {
int getCompletedMaps();
int getCompletedReduces();
boolean isUber();
String getUserName();
TaskAttemptCompletionEvent[]
getTaskAttemptCompletionEvents(int fromEventId, int maxEvents);

View File

@ -146,6 +146,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
private final Set<TaskId> reduceTasks = new LinkedHashSet<TaskId>();
private final EventHandler eventHandler;
private final MRAppMetrics metrics;
private final String userName;
private boolean lazyTasksCopyNeeded = false;
private volatile Map<TaskId, Task> tasks = new LinkedHashMap<TaskId, Task>();
@ -368,7 +369,8 @@ public JobImpl(ApplicationId appID, Configuration conf,
EventHandler eventHandler, TaskAttemptListener taskAttemptListener,
JobTokenSecretManager jobTokenSecretManager,
Credentials fsTokenCredentials, Clock clock, int startCount,
Set<TaskId> completedTasksFromPreviousRun, MRAppMetrics metrics) {
Set<TaskId> completedTasksFromPreviousRun, MRAppMetrics metrics,
String userName) {
this.jobId = recordFactory.newRecordInstance(JobId.class);
this.jobName = conf.get(JobContext.JOB_NAME, "<missing job name>");
@ -377,6 +379,7 @@ public JobImpl(ApplicationId appID, Configuration conf,
this.clock = clock;
this.completedTasksFromPreviousRun = completedTasksFromPreviousRun;
this.startCount = startCount;
this.userName = userName;
jobId.setAppId(appID);
jobId.setId(appID.getId());
oldJobId = TypeConverter.fromYarn(jobId);
@ -762,6 +765,11 @@ JobState finished(JobState finalState) {
return finalState;
}
@Override
public String getUserName() {
return userName;
}
@Override
public String getName() {
return jobName;
@ -1412,5 +1420,4 @@ public void transition(JobImpl job, JobEvent event) {
job.finished(JobState.ERROR);
}
}
}

View File

@ -64,6 +64,7 @@
import org.apache.hadoop.mapreduce.v2.jobhistory.JHConfig;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -235,9 +236,11 @@ public void verifyCompleted() {
}
@Override
protected Job createJob(Configuration conf, Credentials fsTokens) {
protected Job createJob(Configuration conf, Credentials fsTokens,
String user) {
Job newJob = new TestJob(getAppID(), getDispatcher().getEventHandler(),
getTaskAttemptListener(), getContext().getClock());
getTaskAttemptListener(), getContext().getClock(),
user);
((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob);
getDispatcher().register(JobFinishEvent.Type.class,
@ -382,10 +385,11 @@ protected StateMachine<JobState, JobEventType, JobEvent> getStateMachine() {
}
public TestJob(ApplicationId appID, EventHandler eventHandler,
TaskAttemptListener taskAttemptListener, Clock clock) {
TaskAttemptListener taskAttemptListener, Clock clock,
String user) {
super(appID, new Configuration(), eventHandler, taskAttemptListener,
new JobTokenSecretManager(), new Credentials(), clock, getStartCount(),
getCompletedTaskFromPreviousRun(), metrics);
getCompletedTaskFromPreviousRun(), metrics, user);
// This "this leak" is okay because the retained pointer is in an
// instance variable.

View File

@ -448,6 +448,11 @@ public boolean checkAccess(UserGroupInformation callerUGI,
JobACL jobOperation) {
return true;
}
@Override
public String getUserName() {
throw new UnsupportedOperationException("Not supported yet.");
}
};
}
}

View File

@ -48,6 +48,7 @@ public void testMapReduce() throws Exception {
Job job = app.submit(new Configuration());
app.waitForState(job, JobState.SUCCEEDED);
app.verifyCompleted();
Assert.assertEquals(System.getProperty("user.name"),job.getUserName());
}
@Test

View File

@ -456,6 +456,11 @@ public boolean checkAccess(UserGroupInformation callerUGI,
JobACL jobOperation) {
return true;
}
@Override
public String getUserName() {
throw new UnsupportedOperationException("Not supported yet.");
}
}
/*

View File

@ -69,17 +69,20 @@ public class CompletedJob implements org.apache.hadoop.mapreduce.v2.app.job.Job
private final Map<TaskId, Task> tasks = new HashMap<TaskId, Task>();
private final Map<TaskId, Task> mapTasks = new HashMap<TaskId, Task>();
private final Map<TaskId, Task> reduceTasks = new HashMap<TaskId, Task>();
private final String user;
private List<TaskAttemptCompletionEvent> completionEvents = null;
private JobInfo jobInfo;
public CompletedJob(Configuration conf, JobId jobId, Path historyFile, boolean loadTasks) throws IOException {
public CompletedJob(Configuration conf, JobId jobId, Path historyFile,
boolean loadTasks, String userName) throws IOException {
LOG.info("Loading job: " + jobId + " from file: " + historyFile);
this.conf = conf;
this.jobId = jobId;
loadFullHistoryData(loadTasks, historyFile);
user = userName;
counters = TypeConverter.toYarn(jobInfo.getTotalCounters());
diagnostics.add(jobInfo.getErrorInfo());
report = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(JobReport.class);
@ -297,4 +300,9 @@ public boolean checkAccess(UserGroupInformation callerUGI, JobACL jobOperation)
return aclsMgr.checkAccess(callerUGI, jobOperation,
jobInfo.getUsername(), jobACL);
}
@Override
public String getUserName() {
return user;
}
}

View File

@ -612,7 +612,8 @@ public void run() {
private Job loadJob(MetaInfo metaInfo) {
synchronized(metaInfo) {
try {
Job job = new CompletedJob(conf, metaInfo.getJobIndexInfo().getJobId(), metaInfo.getHistoryFile(), true);
Job job = new CompletedJob(conf, metaInfo.getJobIndexInfo().getJobId(),
metaInfo.getHistoryFile(), true, metaInfo.getJobIndexInfo().getUser());
addToLoadedJobCache(job);
return job;
} catch (IOException e) {

View File

@ -139,5 +139,10 @@ public TaskAttemptCompletionEvent[] getTaskAttemptCompletionEvents(
public boolean checkAccess(UserGroupInformation callerUGI, JobACL jobOperation) {
return false;
}
@Override
public String getUserName() {
return null;
}
}

View File

@ -70,7 +70,7 @@ public void testHistoryEvents() throws Exception {
Job parsedJob = context.getJob(jobId);
Assert.assertEquals("CompletedMaps not correct", 2,
parsedJob.getCompletedMaps());
Assert.assertEquals(System.getProperty("user.name"), parsedJob.getUserName());
Map<TaskId, Task> tasks = parsedJob.getTasks();
Assert.assertEquals("No of tasks not correct", 3, tasks.size());

View File

@ -83,26 +83,26 @@ public void testHistoryParsing() throws Exception {
JobHistoryParser parser = new JobHistoryParser(in);
JobInfo jobInfo = parser.parse();
Assert.assertTrue ("Incorrect username ",
jobInfo.getUsername().equals("mapred"));
Assert.assertTrue("Incorrect jobName ",
jobInfo.getJobname().equals("test"));
Assert.assertTrue("Incorrect queuename ",
jobInfo.getJobQueueName().equals("default"));
Assert.assertTrue("incorrect conf path",
jobInfo.getJobConfPath().equals("test"));
Assert.assertTrue("incorrect finishedMap ",
jobInfo.getFinishedMaps() == 2);
Assert.assertTrue("incorrect finishedReduces ",
jobInfo.getFinishedReduces() == 1);
Assert.assertEquals ("Incorrect username ",
"mapred", jobInfo.getUsername());
Assert.assertEquals("Incorrect jobName ",
"test", jobInfo.getJobname());
Assert.assertEquals("Incorrect queuename ",
"default", jobInfo.getJobQueueName());
Assert.assertEquals("incorrect conf path",
"test", jobInfo.getJobConfPath());
Assert.assertEquals("incorrect finishedMap ",
2, jobInfo.getFinishedMaps());
Assert.assertEquals("incorrect finishedReduces ",
1, jobInfo.getFinishedReduces());
int totalTasks = jobInfo.getAllTasks().size();
Assert.assertTrue("total number of tasks is incorrect ", totalTasks == 3);
Assert.assertEquals("total number of tasks is incorrect ", 3, totalTasks);
//Assert at taskAttempt level
for (TaskInfo taskInfo : jobInfo.getAllTasks().values()) {
int taskAttemptCount = taskInfo.getAllTaskAttempts().size();
Assert.assertTrue("total number of task attempts ",
taskAttemptCount == 1);
Assert.assertEquals("total number of task attempts ",
1, taskAttemptCount);
}
String summaryFileName = JobHistoryUtils