MAPREDUCE-3792. Fix "bin/mapred job -list" to display all jobs instead of only the jobs owned by the user. Contributed by Jason Lowe.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1296721 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2012-03-03 23:40:35 +00:00
parent 26cd2356e0
commit fab57a144d
12 changed files with 162 additions and 69 deletions

View File

@ -241,6 +241,9 @@ Release 0.23.2 - UNRELEASED
MAPREDUCE-3896. Add user information to the delegation token issued by the
history server. (Vinod Kumar Vavilapalli via sseth)
MAPREDUCE-3792. Fix "bin/mapred job -list" to display all jobs instead of
only the jobs owned by the user. (Jason Lowe via vinodkv)
Release 0.23.1 - 2012-02-17
INCOMPATIBLE CHANGES

View File

@ -590,7 +590,9 @@ public void displayJobList(JobStatus[] jobs)
@Private
public static String headerPattern = "%23s\t%10s\t%14s\t%12s\t%12s\t%10s\t%15s\t%15s\t%8s\t%8s\t%10s\t%10s\n";
@Private
public static String dataPattern = "%23s\t%10s\t%14d\t%12s\t%12s\t%10s\t%14d\t%14d\t%7dM\t%7sM\t%9dM\t%10s\n";
public static String dataPattern = "%23s\t%10s\t%14d\t%12s\t%12s\t%10s\t%15s\t%15s\t%8s\t%8s\t%10s\t%10s\n";
private static String memPattern = "%dM";
private static String UNAVAILABLE = "N/A";
@Private
public void displayJobList(JobStatus[] jobs, PrintWriter writer) {
@ -599,15 +601,20 @@ public void displayJobList(JobStatus[] jobs, PrintWriter writer) {
"Queue", "Priority", "UsedContainers",
"RsvdContainers", "UsedMem", "RsvdMem", "NeededMem", "AM info");
for (JobStatus job : jobs) {
int numUsedSlots = job.getNumUsedSlots();
int numReservedSlots = job.getNumReservedSlots();
int usedMem = job.getUsedMem();
int rsvdMem = job.getReservedMem();
int neededMem = job.getNeededMem();
writer.printf(dataPattern,
job.getJobID().toString(), job.getState(), job.getStartTime(),
job.getUsername(), job.getQueue(),
job.getPriority().name(),
job.getNumUsedSlots(),
job.getNumReservedSlots(),
job.getUsedMem(),
job.getReservedMem(),
job.getNeededMem(),
numUsedSlots < 0 ? UNAVAILABLE : numUsedSlots,
numReservedSlots < 0 ? UNAVAILABLE : numReservedSlots,
usedMem < 0 ? UNAVAILABLE : String.format(memPattern, usedMem),
rsvdMem < 0 ? UNAVAILABLE : String.format(memPattern, rsvdMem),
neededMem < 0 ? UNAVAILABLE : String.format(memPattern, neededMem),
job.getSchedulingInfo());
}
writer.flush();

View File

@ -79,6 +79,7 @@
public class ClientServiceDelegate {
private static final Log LOG = LogFactory.getLog(ClientServiceDelegate.class);
private static final String UNAVAILABLE = "N/A";
// Caches for per-user NotRunningJobs
private HashMap<JobState, HashMap<String, NotRunningJob>> notRunningJobs;
@ -160,6 +161,13 @@ private MRClientProtocol getProxy() throws YarnRemoteException {
LOG.debug("Application state is " + application.getYarnApplicationState());
application = rm.getApplicationReport(appId);
continue;
} else if (UNAVAILABLE.equals(application.getHost())) {
if (!amAclDisabledStatusLogged) {
LOG.info("Job " + jobId + " is running, but the host is unknown."
+ " Verify user has VIEW_JOB access.");
amAclDisabledStatusLogged = true;
}
return getNotRunningJob(application, JobState.RUNNING);
}
if(!conf.getBoolean(MRJobConfig.JOB_AM_ACCESS_DISABLED, false)) {
UserGroupInformation newUgi = UserGroupInformation.createRemoteUser(
@ -369,9 +377,12 @@ public JobStatus getJobStatus(JobID oldJobID) throws YarnRemoteException {
report.setJobFile(jobFile);
}
String historyTrackingUrl = report.getTrackingUrl();
return TypeConverter.fromYarn(report, "http://"
+ (StringUtils.isNotEmpty(historyTrackingUrl) ? historyTrackingUrl
: trackingUrl));
String url = StringUtils.isNotEmpty(historyTrackingUrl)
? historyTrackingUrl : trackingUrl;
if (!UNAVAILABLE.equals(url)) {
url = "http://" + url;
}
return TypeConverter.fromYarn(report, url);
}
public org.apache.hadoop.mapreduce.TaskReport[] getTaskReports(JobID oldJobID, TaskType taskType)

View File

@ -143,6 +143,18 @@ public KillApplicationResponse forceKillApplication(
* {@link GetApplicationReportResponse} which includes the
* {@link ApplicationReport} for the application.</p>
*
* <p>If the user does not have <code>VIEW_APP</code> access then the
* following fields in the report will be set to stubbed values:
* <ul>
* <li>host - set to "N/A"</li>
* <li>RPC port - set to -1</li>
* <li>client token - set to "N/A"</li>
* <li>diagnostics - set to "N/A"</li>
* <li>tracking URL - set to "N/A"</li>
* <li>original tracking URL - set to "N/A"</li>
* <li>resource usage report - all values are -1</li>
* </ul></p>
*
* @param request request for an application report
* @return application report
* @throws YarnRemoteException
@ -176,6 +188,11 @@ public GetClusterMetricsResponse getClusterMetrics(
* {@link GetAllApplicationsResponse} which includes the
* {@link ApplicationReport} for all the applications.</p>
*
* <p>If the user does not have <code>VIEW_APP</code> access for an
* application then the corresponding report will be filtered as
* described in {@link #getApplicationReport(GetApplicationReportRequest)}.
* </p>
*
* @param request request for report on all running applications
* @return report on all running applications
* @throws YarnRemoteException

View File

@ -334,6 +334,19 @@ public static ApplicationReport newApplicationReport(
return report;
}
public static ApplicationResourceUsageReport newApplicationResourceUsageReport(
int numUsedContainers, int numReservedContainers, Resource usedResources,
Resource reservedResources, Resource neededResources) {
ApplicationResourceUsageReport report =
recordFactory.newRecordInstance(ApplicationResourceUsageReport.class);
report.setNumUsedContainers(numUsedContainers);
report.setNumReservedContainers(numReservedContainers);
report.setUsedResources(usedResources);
report.setReservedResources(reservedResources);
report.setNeededResources(neededResources);
return report;
}
public static Resource newResource(int memory) {
Resource resource = recordFactory.newRecordInstance(Resource.class);
resource.setMemory(memory);

View File

@ -218,14 +218,10 @@ public GetApplicationReportResponse getApplicationReport(
+ "absent application " + applicationId);
}
if (!checkAccess(callerUGI, application.getUser(),
ApplicationAccessType.VIEW_APP, applicationId)) {
throw RPCUtil.getRemoteException(new AccessControlException("User "
+ callerUGI.getShortUserName() + " cannot perform operation "
+ ApplicationAccessType.VIEW_APP.name() + " on " + applicationId));
}
ApplicationReport report = application.createAndGetApplicationReport();
boolean allowAccess = checkAccess(callerUGI, application.getUser(),
ApplicationAccessType.VIEW_APP, applicationId);
ApplicationReport report =
application.createAndGetApplicationReport(allowAccess);
GetApplicationReportResponse response = recordFactory
.newRecordInstance(GetApplicationReportResponse.class);
@ -349,14 +345,9 @@ public GetAllApplicationsResponse getAllApplications(
List<ApplicationReport> reports = new ArrayList<ApplicationReport>();
for (RMApp application : this.rmContext.getRMApps().values()) {
// Only give out the applications viewable by the user as
// ApplicationReport has confidential information like client-token, ACLs
// etc. Web UI displays all applications though as we filter and print
// only public information there.
if (checkAccess(callerUGI, application.getUser(),
ApplicationAccessType.VIEW_APP, application.getApplicationId())) {
reports.add(application.createAndGetApplicationReport());
}
boolean allowAccess = checkAccess(callerUGI, application.getUser(),
ApplicationAccessType.VIEW_APP, application.getApplicationId());
reports.add(application.createAndGetApplicationReport(allowAccess));
}
GetAllApplicationsResponse response =
@ -395,7 +386,7 @@ public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request)
appReports = new ArrayList<ApplicationReport>(
apps.size());
for (RMApp app : apps) {
appReports.add(app.createAndGetApplicationReport());
appReports.add(app.createAndGetApplicationReport(true));
}
}
queueInfo.setApplications(appReports);

View File

@ -92,12 +92,25 @@ public interface RMApp extends EventHandler<RMAppEvent> {
/**
* To get the status of an application in the RM, this method can be used.
* If full access is not allowed then the following fields in the report
* will be stubbed:
* <ul>
* <li>host - set to "N/A"</li>
* <li>RPC port - set to -1</li>
* <li>client token - set to "N/A"</li>
* <li>diagnostics - set to "N/A"</li>
* <li>tracking URL - set to "N/A"</li>
* <li>original tracking URL - set to "N/A"</li>
* <li>resource usage report - all values are -1</li>
* </ul>
*
* @param allowAccess whether to allow full access to the report
* @return the {@link ApplicationReport} detailing the status of the application.
*/
ApplicationReport createAndGetApplicationReport();
ApplicationReport createAndGetApplicationReport(boolean allowAccess);
/**
* Application level metadata is stored in {@link ApplicationStore} whicn
* Application level metadata is stored in {@link ApplicationStore} which
* can persist the information.
* @return the {@link ApplicationStore} for this {@link RMApp}.
*/

View File

@ -47,6 +47,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
@ -64,6 +65,7 @@
public class RMAppImpl implements RMApp {
private static final Log LOG = LogFactory.getLog(RMAppImpl.class);
private static final String UNAVAILABLE = "N/A";
// Immutable fields
private final ApplicationId applicationId;
@ -162,6 +164,12 @@ RMAppEventType.KILL, new KillAppAndAttemptTransition())
private final StateMachine<RMAppState, RMAppEventType, RMAppEvent>
stateMachine;
private static final ApplicationResourceUsageReport
DUMMY_APPLICATION_RESOURCE_USAGE_REPORT =
BuilderUtils.newApplicationResourceUsageReport(-1, -1,
Resources.createResource(-1), Resources.createResource(-1),
Resources.createResource(-1));
public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
Configuration config, String name, String user, String queue,
ApplicationSubmissionContext submissionContext, String clientTokenStr,
@ -324,29 +332,35 @@ private FinalApplicationStatus createFinalApplicationStatus(RMAppState state) {
@Override
public ApplicationReport createAndGetApplicationReport() {
public ApplicationReport createAndGetApplicationReport(boolean allowAccess) {
this.readLock.lock();
try {
String clientToken = "N/A";
String trackingUrl = "N/A";
String host = "N/A";
String origTrackingUrl = "N/A";
String clientToken = UNAVAILABLE;
String trackingUrl = UNAVAILABLE;
String host = UNAVAILABLE;
String origTrackingUrl = UNAVAILABLE;
int rpcPort = -1;
ApplicationResourceUsageReport appUsageReport = null;
FinalApplicationStatus finishState = getFinalApplicationStatus();
if (this.currentAttempt != null) {
trackingUrl = this.currentAttempt.getTrackingUrl();
origTrackingUrl = this.currentAttempt.getOriginalTrackingUrl();
clientToken = this.currentAttempt.getClientToken();
host = this.currentAttempt.getHost();
rpcPort = this.currentAttempt.getRpcPort();
appUsageReport = currentAttempt.getApplicationResourceUsageReport();
String diags = UNAVAILABLE;
if (allowAccess) {
if (this.currentAttempt != null) {
trackingUrl = this.currentAttempt.getTrackingUrl();
origTrackingUrl = this.currentAttempt.getOriginalTrackingUrl();
clientToken = this.currentAttempt.getClientToken();
host = this.currentAttempt.getHost();
rpcPort = this.currentAttempt.getRpcPort();
appUsageReport = currentAttempt.getApplicationResourceUsageReport();
}
diags = this.diagnostics.toString();
} else {
appUsageReport = DUMMY_APPLICATION_RESOURCE_USAGE_REPORT;
}
return BuilderUtils.newApplicationReport(this.applicationId, this.user,
this.queue, this.name, host, rpcPort, clientToken,
createApplicationState(this.stateMachine.getCurrentState()),
this.diagnostics.toString(), trackingUrl,
diags, trackingUrl,
this.startTime, this.finishTime, finishState, appUsageReport,
origTrackingUrl);
} finally {

View File

@ -517,18 +517,12 @@ public ApplicationResourceUsageReport getApplicationResourceUsageReport() {
}
}
}
ApplicationResourceUsageReport appResources =
recordFactory.newRecordInstance(ApplicationResourceUsageReport.class);
appResources.setNumUsedContainers(numUsedContainers);
appResources.setNumReservedContainers(numReservedContainers);
appResources.setUsedResources(
Resources.createResource(currentConsumption));
appResources.setReservedResources(
Resources.createResource(reservedResources));
appResources.setNeededResources(
return BuilderUtils.newApplicationResourceUsageReport(
numUsedContainers, numReservedContainers,
Resources.createResource(currentConsumption),
Resources.createResource(reservedResources),
Resources.createResource(currentConsumption + reservedResources));
return appResources;
} finally {
this.readLock.unlock();
}

View File

@ -22,6 +22,7 @@
import java.net.InetSocketAddress;
import java.security.PrivilegedExceptionAction;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import junit.framework.Assert;
@ -40,6 +41,8 @@
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.Resource;
@ -65,6 +68,7 @@ public class TestApplicationACLs {
private static final String SUPER_USER = "superUser";
private static final String FRIENDLY_GROUP = "friendly-group";
private static final String SUPER_GROUP = "superGroup";
private static final String UNAVAILABLE = "N/A";
private static final Log LOG = LogFactory.getLog(TestApplicationACLs.class);
@ -298,20 +302,20 @@ private void verifyEnemyAccess() throws Exception {
ClientRMProtocol enemyRmClient = getRMClientForUser(ENEMY);
// View as the enemy
try {
enemyRmClient.getApplicationReport(appReportRequest);
Assert.fail("App view by the enemy should fail!!");
} catch (YarnRemoteException e) {
LOG.info("Got exception while viewing app as the enemy", e);
Assert.assertEquals("User enemy cannot perform operation VIEW_APP on "
+ applicationId, e.getMessage());
}
ApplicationReport appReport = enemyRmClient.getApplicationReport(
appReportRequest).getApplicationReport();
verifyEnemyAppReport(appReport);
// List apps as enemy
Assert.assertEquals("App view by enemy should not list any apps!!", 0,
enemyRmClient.getAllApplications(
recordFactory.newRecordInstance(GetAllApplicationsRequest.class))
.getApplicationList().size());
List<ApplicationReport> appReports = enemyRmClient
.getAllApplications(recordFactory
.newRecordInstance(GetAllApplicationsRequest.class))
.getApplicationList();
Assert.assertEquals("App view by enemy should list the apps!!", 4,
appReports.size());
for (ApplicationReport report : appReports) {
verifyEnemyAppReport(report);
}
// Kill app as the enemy
try {
@ -319,11 +323,37 @@ private void verifyEnemyAccess() throws Exception {
Assert.fail("App killing by the enemy should fail!!");
} catch (YarnRemoteException e) {
LOG.info("Got exception while killing app as the enemy", e);
Assert.assertEquals(
"User enemy cannot perform operation MODIFY_APP on "
+ applicationId, e.getMessage());
Assert.assertEquals("User enemy cannot perform operation MODIFY_APP on "
+ applicationId, e.getMessage());
}
rmClient.forceKillApplication(finishAppRequest);
}
private void verifyEnemyAppReport(ApplicationReport appReport) {
Assert.assertEquals("Enemy should not see app host!",
UNAVAILABLE, appReport.getHost());
Assert.assertEquals("Enemy should not see app rpc port!",
-1, appReport.getRpcPort());
Assert.assertEquals("Enemy should not see app client token!",
UNAVAILABLE, appReport.getClientToken());
Assert.assertEquals("Enemy should not see app diagnostics!",
UNAVAILABLE, appReport.getDiagnostics());
Assert.assertEquals("Enemy should not see app tracking url!",
UNAVAILABLE, appReport.getTrackingUrl());
Assert.assertEquals("Enemy should not see app original tracking url!",
UNAVAILABLE, appReport.getOriginalTrackingUrl());
ApplicationResourceUsageReport usageReport =
appReport.getApplicationResourceUsageReport();
Assert.assertEquals("Enemy should not see app used containers",
-1, usageReport.getNumUsedContainers());
Assert.assertEquals("Enemy should not see app reserved containers",
-1, usageReport.getNumReservedContainers());
Assert.assertEquals("Enemy should not see app used resources",
-1, usageReport.getUsedResources().getMemory());
Assert.assertEquals("Enemy should not see app reserved resources",
-1, usageReport.getReservedResources().getMemory());
Assert.assertEquals("Enemy should not see app needed resources",
-1, usageReport.getNeededResources().getMemory());
}
}

View File

@ -207,7 +207,7 @@ public String getTrackingUrl() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public ApplicationReport createAndGetApplicationReport() {
public ApplicationReport createAndGetApplicationReport(boolean allowAccess) {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override

View File

@ -119,7 +119,7 @@ public void setCurrentAppAttempt(RMAppAttempt attempt) {
}
@Override
public ApplicationReport createAndGetApplicationReport() {
public ApplicationReport createAndGetApplicationReport(boolean allowAccess) {
throw new UnsupportedOperationException("Not supported yet.");
}