YARN-937. Fix unmanaged AM in non-secure/secure setup post YARN-701. (tucu)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1507706 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
ae8f07d47b
commit
f179afc68d
@ -90,7 +90,7 @@ private ApplicationReport getUnknownApplicationReport() {
|
||||
return ApplicationReport.newInstance(unknownAppId, unknownAttemptId,
|
||||
"N/A", "N/A", "N/A", "N/A", 0, null, YarnApplicationState.NEW, "N/A",
|
||||
"N/A", 0, 0, FinalApplicationStatus.UNDEFINED, null, "N/A", 0.0f,
|
||||
YarnConfiguration.DEFAULT_APPLICATION_TYPE);
|
||||
YarnConfiguration.DEFAULT_APPLICATION_TYPE, null);
|
||||
}
|
||||
|
||||
NotRunningJob(ApplicationReport applicationReport, JobState jobState) {
|
||||
|
@ -54,6 +54,7 @@
|
||||
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
@ -285,6 +286,12 @@ public ApplicationReport getApplicationReport(ApplicationId appId)
|
||||
return client.getApplicationReport(appId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Token<AMRMTokenIdentifier> getAMRMToken(ApplicationId appId)
|
||||
throws YarnException, IOException {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ApplicationReport> getApplications() throws YarnException,
|
||||
IOException {
|
||||
|
@ -430,7 +430,7 @@ private ApplicationReport getFinishedApplicationReport() {
|
||||
return ApplicationReport.newInstance(appId, attemptId, "user", "queue",
|
||||
"appname", "host", 124, null, YarnApplicationState.FINISHED,
|
||||
"diagnostics", "url", 0, 0, FinalApplicationStatus.SUCCEEDED, null,
|
||||
"N/A", 0.0f, YarnConfiguration.DEFAULT_APPLICATION_TYPE);
|
||||
"N/A", 0.0f, YarnConfiguration.DEFAULT_APPLICATION_TYPE, null);
|
||||
}
|
||||
|
||||
private ApplicationReport getRunningApplicationReport(String host, int port) {
|
||||
@ -440,7 +440,7 @@ private ApplicationReport getRunningApplicationReport(String host, int port) {
|
||||
return ApplicationReport.newInstance(appId, attemptId, "user", "queue",
|
||||
"appname", host, port, null, YarnApplicationState.RUNNING, "diagnostics",
|
||||
"url", 0, 0, FinalApplicationStatus.UNDEFINED, null, "N/A", 0.0f,
|
||||
YarnConfiguration.DEFAULT_APPLICATION_TYPE);
|
||||
YarnConfiguration.DEFAULT_APPLICATION_TYPE, null);
|
||||
}
|
||||
|
||||
private ResourceMgrDelegate getRMDelegate() throws IOException {
|
||||
|
@ -822,6 +822,8 @@ Release 2.1.0-beta - 2013-07-02
|
||||
YARN-909. Disable TestLinuxContainerExecutorWithMocks on Windows. (Chuan Liu
|
||||
via cnauroth)
|
||||
|
||||
YARN-937. Fix unmanaged AM in non-secure/secure setup post YARN-701. (tucu)
|
||||
|
||||
Release 2.0.5-alpha - 06/06/2013
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -58,7 +58,7 @@ public static ApplicationReport newInstance(ApplicationId applicationId,
|
||||
YarnApplicationState state, String diagnostics, String url,
|
||||
long startTime, long finishTime, FinalApplicationStatus finalStatus,
|
||||
ApplicationResourceUsageReport appResources, String origTrackingUrl,
|
||||
float progress, String applicationType) {
|
||||
float progress, String applicationType, Token amRmToken) {
|
||||
ApplicationReport report = Records.newRecord(ApplicationReport.class);
|
||||
report.setApplicationId(applicationId);
|
||||
report.setCurrentApplicationAttemptId(applicationAttemptId);
|
||||
@ -78,6 +78,7 @@ public static ApplicationReport newInstance(ApplicationId applicationId,
|
||||
report.setOriginalTrackingUrl(origTrackingUrl);
|
||||
report.setProgress(progress);
|
||||
report.setApplicationType(applicationType);
|
||||
report.setAMRMToken(amRmToken);
|
||||
return report;
|
||||
}
|
||||
|
||||
@ -319,4 +320,33 @@ public static ApplicationReport newInstance(ApplicationId applicationId,
|
||||
@Private
|
||||
@Unstable
|
||||
public abstract void setApplicationType(String applicationType);
|
||||
|
||||
@Private
|
||||
@Stable
|
||||
public abstract void setAMRMToken(Token amRmToken);
|
||||
|
||||
/**
|
||||
* Get the AMRM token of the application.
|
||||
* <p/>
|
||||
* The AMRM token is required for AM to RM scheduling operations. For
|
||||
* managed Application Masters Yarn takes care of injecting it. For unmanaged
|
||||
* Applications Masters, the token must be obtained via this method and set
|
||||
* in the {@link org.apache.hadoop.security.UserGroupInformation} of the
|
||||
* current user.
|
||||
* <p/>
|
||||
* The AMRM token will be returned only if all the following conditions are
|
||||
* met:
|
||||
* <li>
|
||||
* <ul>the requester is the owner of the ApplicationMaster</ul>
|
||||
* <ul>the application master is an unmanaged ApplicationMaster</ul>
|
||||
* <ul>the application master is in ACCEPTED state</ul>
|
||||
* </li>
|
||||
* Else this method returns NULL.
|
||||
*
|
||||
* @return the AM to RM token if available.
|
||||
*/
|
||||
@Public
|
||||
@Stable
|
||||
public abstract Token getAMRMToken();
|
||||
|
||||
}
|
||||
|
@ -149,6 +149,7 @@ message ApplicationReportProto {
|
||||
optional ApplicationAttemptIdProto currentApplicationAttemptId = 16;
|
||||
optional float progress = 17;
|
||||
optional string applicationType = 18;
|
||||
optional hadoop.common.TokenProto am_rm_token = 19;
|
||||
}
|
||||
|
||||
enum NodeStateProto {
|
||||
|
@ -19,7 +19,9 @@
|
||||
package org.apache.hadoop.yarn.applications.unmanagedamlauncher;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStreamReader;
|
||||
import java.net.InetAddress;
|
||||
@ -36,6 +38,9 @@
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.yarn.api.ApplicationConstants;
|
||||
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
|
||||
@ -51,6 +56,7 @@
|
||||
import org.apache.hadoop.yarn.client.api.YarnClient;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
|
||||
/**
|
||||
@ -166,7 +172,31 @@ public boolean init(String[] args) throws ParseException {
|
||||
return true;
|
||||
}
|
||||
|
||||
public void launchAM(ApplicationAttemptId attemptId) throws IOException {
|
||||
public void launchAM(ApplicationAttemptId attemptId)
|
||||
throws IOException, YarnException {
|
||||
ApplicationReport report =
|
||||
rmClient.getApplicationReport(attemptId.getApplicationId());
|
||||
if (report.getYarnApplicationState() != YarnApplicationState.ACCEPTED) {
|
||||
throw new YarnException(
|
||||
"Umanaged AM must be in ACCEPTED state before launching");
|
||||
}
|
||||
Credentials credentials = new Credentials();
|
||||
Token<AMRMTokenIdentifier> token =
|
||||
rmClient.getAMRMToken(attemptId.getApplicationId());
|
||||
credentials.addToken(token.getService(), token);
|
||||
File tokenFile = File.createTempFile("unmanagedAMRMToken","",
|
||||
new File(System.getProperty("user.dir")));
|
||||
try {
|
||||
FileUtil.chmod(tokenFile.getAbsolutePath(), "600");
|
||||
} catch (InterruptedException ex) {
|
||||
throw new RuntimeException(ex);
|
||||
}
|
||||
tokenFile.deleteOnExit();
|
||||
DataOutputStream os = new DataOutputStream(new FileOutputStream(tokenFile,
|
||||
true));
|
||||
credentials.writeTokenStorageToStream(os);
|
||||
os.close();
|
||||
|
||||
Map<String, String> env = System.getenv();
|
||||
ArrayList<String> envAMList = new ArrayList<String>();
|
||||
boolean setClasspath = false;
|
||||
@ -196,6 +226,9 @@ public void launchAM(ApplicationAttemptId attemptId) throws IOException {
|
||||
envAMList.add(ApplicationConstants.APP_SUBMIT_TIME_ENV + "="
|
||||
+ System.currentTimeMillis());
|
||||
|
||||
envAMList.add(ApplicationConstants.CONTAINER_TOKEN_FILE_ENV_NAME + "=" +
|
||||
tokenFile.getAbsolutePath());
|
||||
|
||||
String[] envAM = new String[envAMList.size()];
|
||||
Process amProc = Runtime.getRuntime().exec(amCmd, envAMList.toArray(envAM));
|
||||
|
||||
|
@ -40,7 +40,6 @@
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestUnmanagedAMLauncher {
|
||||
/**
|
||||
private static final Log LOG = LogFactory
|
||||
.getLog(TestUnmanagedAMLauncher.class);
|
||||
|
||||
@ -185,5 +184,5 @@ public void testDSShellError() throws Exception {
|
||||
// Expected
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
}
|
||||
|
@ -39,6 +39,7 @@
|
||||
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
|
||||
import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Stable
|
||||
@ -140,6 +141,32 @@ public abstract void killApplication(ApplicationId applicationId) throws YarnExc
|
||||
public abstract ApplicationReport getApplicationReport(ApplicationId appId)
|
||||
throws YarnException, IOException;
|
||||
|
||||
/**
|
||||
* Get the AMRM token of the application.
|
||||
* <p/>
|
||||
* The AMRM token is required for AM to RM scheduling operations. For
|
||||
* managed Application Masters Yarn takes care of injecting it. For unmanaged
|
||||
* Applications Masters, the token must be obtained via this method and set
|
||||
* in the {@link org.apache.hadoop.security.UserGroupInformation} of the
|
||||
* current user.
|
||||
* <p/>
|
||||
* The AMRM token will be returned only if all the following conditions are
|
||||
* met:
|
||||
* <li>
|
||||
* <ul>the requester is the owner of the ApplicationMaster</ul>
|
||||
* <ul>the application master is an unmanaged ApplicationMaster</ul>
|
||||
* <ul>the application master is in ACCEPTED state</ul>
|
||||
* </li>
|
||||
* Else this method returns NULL.
|
||||
*
|
||||
* @param appId {@link ApplicationId} of the application to get the AMRM token
|
||||
* @return the AMRM token if available
|
||||
* @throws YarnException
|
||||
* @throws IOException
|
||||
*/
|
||||
public abstract org.apache.hadoop.security.token.Token<AMRMTokenIdentifier>
|
||||
getAMRMToken(ApplicationId appId) throws YarnException, IOException;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* Get a report (ApplicationReport) of all Applications in the cluster.
|
||||
|
@ -65,6 +65,8 @@
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
@ -195,6 +197,21 @@ public ApplicationReport getApplicationReport(ApplicationId appId)
|
||||
return response.getApplicationReport();
|
||||
}
|
||||
|
||||
public org.apache.hadoop.security.token.Token<AMRMTokenIdentifier>
|
||||
getAMRMToken(ApplicationId appId) throws YarnException, IOException {
|
||||
org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> amrmToken = null;
|
||||
ApplicationReport report = getApplicationReport(appId);
|
||||
Token token = report.getAMRMToken();
|
||||
if (token != null) {
|
||||
InetSocketAddress address = getConfig().getSocketAddr(
|
||||
YarnConfiguration.RM_SCHEDULER_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
|
||||
amrmToken = ConverterUtils.convertFromYarn(token, address);
|
||||
}
|
||||
return amrmToken;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ApplicationReport> getApplications() throws YarnException,
|
||||
IOException {
|
||||
|
@ -25,6 +25,7 @@
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
@ -34,6 +35,7 @@
|
||||
import junit.framework.Assert;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
|
||||
@ -44,15 +46,20 @@
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||
import org.apache.hadoop.yarn.client.api.YarnClient;
|
||||
import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
|
||||
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.MiniYARNCluster;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
import org.apache.log4j.Level;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
@ -237,7 +244,7 @@ private List<ApplicationReport> createAppReports() {
|
||||
applicationId, ApplicationAttemptId.newInstance(applicationId, 1),
|
||||
"user", "queue", "appname", "host", 124, null,
|
||||
YarnApplicationState.FINISHED, "diagnostics", "url", 0, 0,
|
||||
FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.53789f, "YARN");
|
||||
FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.53789f, "YARN", null);
|
||||
List<ApplicationReport> applicationReports =
|
||||
new ArrayList<ApplicationReport>();
|
||||
applicationReports.add(newApplicationReport);
|
||||
@ -247,7 +254,8 @@ private List<ApplicationReport> createAppReports() {
|
||||
applicationId2, ApplicationAttemptId.newInstance(applicationId2, 2),
|
||||
"user2", "queue2", "appname2", "host2", 125, null,
|
||||
YarnApplicationState.FINISHED, "diagnostics2", "url2", 2, 2,
|
||||
FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.63789f, "NON-YARN");
|
||||
FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.63789f, "NON-YARN",
|
||||
null);
|
||||
applicationReports.add(newApplicationReport2);
|
||||
|
||||
ApplicationId applicationId3 = ApplicationId.newInstance(1234, 7);
|
||||
@ -255,7 +263,8 @@ private List<ApplicationReport> createAppReports() {
|
||||
applicationId3, ApplicationAttemptId.newInstance(applicationId3, 3),
|
||||
"user3", "queue3", "appname3", "host3", 126, null,
|
||||
YarnApplicationState.FINISHED, "diagnostics3", "url3", 3, 3,
|
||||
FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.73789f, "MAPREDUCE");
|
||||
FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.73789f, "MAPREDUCE",
|
||||
null);
|
||||
applicationReports.add(newApplicationReport3);
|
||||
return applicationReports;
|
||||
}
|
||||
@ -281,4 +290,107 @@ private GetApplicationsResponse getApplicationReports(
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 30000)
|
||||
public void testAMMRTokens() throws Exception {
|
||||
MiniYARNCluster cluster = new MiniYARNCluster("testMRAMTokens", 1, 1, 1);
|
||||
YarnClient rmClient = null;
|
||||
try {
|
||||
cluster.init(new YarnConfiguration());
|
||||
cluster.start();
|
||||
final Configuration yarnConf = cluster.getConfig();
|
||||
rmClient = YarnClient.createYarnClient();
|
||||
rmClient.init(yarnConf);
|
||||
rmClient.start();
|
||||
|
||||
ApplicationId appId = createApp(rmClient, false);
|
||||
waitTillAccepted(rmClient, appId);
|
||||
//managed AMs don't return AMRM token
|
||||
Assert.assertNull(rmClient.getAMRMToken(appId));
|
||||
|
||||
appId = createApp(rmClient, true);
|
||||
waitTillAccepted(rmClient, appId);
|
||||
//unmanaged AMs do return AMRM token
|
||||
Assert.assertNotNull(rmClient.getAMRMToken(appId));
|
||||
|
||||
UserGroupInformation other =
|
||||
UserGroupInformation.createUserForTesting("foo", new String[]{});
|
||||
appId = other.doAs(
|
||||
new PrivilegedExceptionAction<ApplicationId>() {
|
||||
@Override
|
||||
public ApplicationId run() throws Exception {
|
||||
YarnClient rmClient = YarnClient.createYarnClient();
|
||||
rmClient.init(yarnConf);
|
||||
rmClient.start();
|
||||
ApplicationId appId = createApp(rmClient, true);
|
||||
waitTillAccepted(rmClient, appId);
|
||||
//unmanaged AMs do return AMRM token
|
||||
Assert.assertNotNull(rmClient.getAMRMToken(appId));
|
||||
return appId;
|
||||
}
|
||||
});
|
||||
//other users don't get AMRM token
|
||||
Assert.assertNull(rmClient.getAMRMToken(appId));
|
||||
} finally {
|
||||
if (rmClient != null) {
|
||||
rmClient.stop();
|
||||
}
|
||||
cluster.stop();
|
||||
}
|
||||
}
|
||||
|
||||
private ApplicationId createApp(YarnClient rmClient, boolean unmanaged)
|
||||
throws Exception {
|
||||
YarnClientApplication newApp = rmClient.createApplication();
|
||||
|
||||
ApplicationId appId = newApp.getNewApplicationResponse().getApplicationId();
|
||||
|
||||
// Create launch context for app master
|
||||
ApplicationSubmissionContext appContext
|
||||
= Records.newRecord(ApplicationSubmissionContext.class);
|
||||
|
||||
// set the application id
|
||||
appContext.setApplicationId(appId);
|
||||
|
||||
// set the application name
|
||||
appContext.setApplicationName("test");
|
||||
|
||||
// Set the priority for the application master
|
||||
Priority pri = Records.newRecord(Priority.class);
|
||||
pri.setPriority(1);
|
||||
appContext.setPriority(pri);
|
||||
|
||||
// Set the queue to which this application is to be submitted in the RM
|
||||
appContext.setQueue("default");
|
||||
|
||||
// Set up the container launch context for the application master
|
||||
ContainerLaunchContext amContainer
|
||||
= Records.newRecord(ContainerLaunchContext.class);
|
||||
appContext.setAMContainerSpec(amContainer);
|
||||
appContext.setResource(Resource.newInstance(1024, 1));
|
||||
appContext.setUnmanagedAM(unmanaged);
|
||||
|
||||
// Submit the application to the applications manager
|
||||
rmClient.submitApplication(appContext);
|
||||
|
||||
return appId;
|
||||
}
|
||||
|
||||
private void waitTillAccepted(YarnClient rmClient, ApplicationId appId)
|
||||
throws Exception {
|
||||
try {
|
||||
long start = System.currentTimeMillis();
|
||||
ApplicationReport report = rmClient.getApplicationReport(appId);
|
||||
while (YarnApplicationState.ACCEPTED != report.getYarnApplicationState()) {
|
||||
if (System.currentTimeMillis() - start > 20 * 1000) {
|
||||
throw new Exception("App '" + appId +
|
||||
"' time out, failed to reach ACCEPTED state");
|
||||
}
|
||||
Thread.sleep(200);
|
||||
report = rmClient.getApplicationReport(appId);
|
||||
}
|
||||
} catch (Exception ex) {
|
||||
throw new Exception(ex);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -80,7 +80,7 @@ public void testGetApplicationReport() throws Exception {
|
||||
applicationId, ApplicationAttemptId.newInstance(applicationId, 1),
|
||||
"user", "queue", "appname", "host", 124, null,
|
||||
YarnApplicationState.FINISHED, "diagnostics", "url", 0, 0,
|
||||
FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.53789f, "YARN");
|
||||
FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.53789f, "YARN", null);
|
||||
when(client.getApplicationReport(any(ApplicationId.class))).thenReturn(
|
||||
newApplicationReport);
|
||||
int result = cli.run(new String[] { "-status", applicationId.toString() });
|
||||
@ -134,7 +134,7 @@ public void testGetApplications() throws Exception {
|
||||
applicationId, ApplicationAttemptId.newInstance(applicationId, 1),
|
||||
"user", "queue", "appname", "host", 124, null,
|
||||
YarnApplicationState.FINISHED, "diagnostics", "url", 0, 0,
|
||||
FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.53789f, "YARN");
|
||||
FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.53789f, "YARN", null);
|
||||
List<ApplicationReport> applicationReports = new ArrayList<ApplicationReport>();
|
||||
applicationReports.add(newApplicationReport);
|
||||
|
||||
@ -143,7 +143,8 @@ public void testGetApplications() throws Exception {
|
||||
applicationId2, ApplicationAttemptId.newInstance(applicationId2, 2),
|
||||
"user2", "queue2", "appname2", "host2", 125, null,
|
||||
YarnApplicationState.FINISHED, "diagnostics2", "url2", 2, 2,
|
||||
FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.63789f, "NON-YARN");
|
||||
FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.63789f, "NON-YARN",
|
||||
null);
|
||||
applicationReports.add(newApplicationReport2);
|
||||
|
||||
ApplicationId applicationId3 = ApplicationId.newInstance(1234, 7);
|
||||
@ -151,7 +152,8 @@ public void testGetApplications() throws Exception {
|
||||
applicationId3, ApplicationAttemptId.newInstance(applicationId3, 3),
|
||||
"user3", "queue3", "appname3", "host3", 126, null,
|
||||
YarnApplicationState.FINISHED, "diagnostics3", "url3", 3, 3,
|
||||
FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.73789f, "MAPREDUCE");
|
||||
FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.73789f, "MAPREDUCE",
|
||||
null);
|
||||
applicationReports.add(newApplicationReport3);
|
||||
|
||||
Set<String> appType1 = new HashSet<String>();
|
||||
|
@ -46,6 +46,7 @@ public class ApplicationReportPBImpl extends ApplicationReport {
|
||||
private ApplicationId applicationId;
|
||||
private ApplicationAttemptId currentApplicationAttemptId;
|
||||
private Token clientToAMToken = null;
|
||||
private Token amRmToken = null;
|
||||
|
||||
public ApplicationReportPBImpl() {
|
||||
builder = ApplicationReportProto.newBuilder();
|
||||
@ -228,7 +229,20 @@ public String getApplicationType() {
|
||||
}
|
||||
return p.getApplicationType();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public Token getAMRMToken() {
|
||||
ApplicationReportProtoOrBuilder p = viaProto ? proto : builder;
|
||||
if (amRmToken != null) {
|
||||
return amRmToken;
|
||||
}
|
||||
if (!p.hasAmRmToken()) {
|
||||
return null;
|
||||
}
|
||||
amRmToken = convertFromProtoFormat(p.getAmRmToken());
|
||||
return amRmToken;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setApplicationId(ApplicationId applicationId) {
|
||||
maybeInitBuilder();
|
||||
@ -377,6 +391,15 @@ public void setProgress(float progress) {
|
||||
builder.setProgress(progress);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setAMRMToken(Token amRmToken) {
|
||||
maybeInitBuilder();
|
||||
if (amRmToken == null) {
|
||||
builder.clearAmRmToken();
|
||||
}
|
||||
this.amRmToken = amRmToken;
|
||||
}
|
||||
|
||||
public ApplicationReportProto getProto() {
|
||||
mergeLocalToProto();
|
||||
proto = viaProto ? proto : builder.build();
|
||||
@ -420,6 +443,11 @@ private void mergeLocalToBuilder() {
|
||||
builder.getClientToAmToken())) {
|
||||
builder.setClientToAmToken(convertToProtoFormat(this.clientToAMToken));
|
||||
}
|
||||
if (this.amRmToken != null
|
||||
&& !((TokenPBImpl) this.amRmToken).getProto().equals(
|
||||
builder.getAmRmToken())) {
|
||||
builder.setAmRmToken(convertToProtoFormat(this.amRmToken));
|
||||
}
|
||||
}
|
||||
|
||||
private void mergeLocalToProto() {
|
||||
|
@ -46,6 +46,7 @@ public void testApplicationReport() {
|
||||
appReport2.setCurrentApplicationAttemptId(null);
|
||||
Assert.assertNull(appReport2.getCurrentApplicationAttemptId());
|
||||
Assert.assertNotSame(appReport2, appReport3);
|
||||
Assert.assertNull(appReport1.getAMRMToken());
|
||||
}
|
||||
|
||||
protected static ApplicationReport createApplicationReport(
|
||||
@ -57,7 +58,7 @@ protected static ApplicationReport createApplicationReport(
|
||||
ApplicationReport.newInstance(appId, appAttemptId, "user", "queue",
|
||||
"appname", "host", 124, null, YarnApplicationState.FINISHED,
|
||||
"diagnostics", "url", 0, 0, FinalApplicationStatus.SUCCEEDED, null,
|
||||
"N/A", 0.53789f, YarnConfiguration.DEFAULT_APPLICATION_TYPE);
|
||||
"N/A", 0.53789f, YarnConfiguration.DEFAULT_APPLICATION_TYPE, null);
|
||||
return appReport;
|
||||
}
|
||||
|
||||
|
@ -231,6 +231,11 @@ public static Token newClientToAMToken(byte[] identifier, String kind,
|
||||
return newToken(Token.class, identifier, kind, password, service);
|
||||
}
|
||||
|
||||
public static Token newAMRMToken(byte[] identifier, String kind,
|
||||
byte[] password, String service) {
|
||||
return newToken(Token.class, identifier, kind, password, service);
|
||||
}
|
||||
|
||||
@Private
|
||||
@VisibleForTesting
|
||||
public static Token newContainerToken(NodeId nodeId,
|
||||
@ -307,7 +312,7 @@ public static ApplicationReport newApplicationReport(
|
||||
String url, long startTime, long finishTime,
|
||||
FinalApplicationStatus finalStatus,
|
||||
ApplicationResourceUsageReport appResources, String origTrackingUrl,
|
||||
float progress, String appType) {
|
||||
float progress, String appType, Token amRmToken) {
|
||||
ApplicationReport report = recordFactory
|
||||
.newRecordInstance(ApplicationReport.class);
|
||||
report.setApplicationId(applicationId);
|
||||
@ -328,6 +333,7 @@ public static ApplicationReport newApplicationReport(
|
||||
report.setOriginalTrackingUrl(origTrackingUrl);
|
||||
report.setProgress(progress);
|
||||
report.setApplicationType(appType);
|
||||
report.setAMRMToken(amRmToken);
|
||||
return report;
|
||||
}
|
||||
|
||||
|
@ -18,6 +18,7 @@
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
@ -32,6 +33,7 @@
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.util.ExitUtil;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
@ -47,6 +49,7 @@
|
||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent;
|
||||
@ -60,6 +63,7 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanAppEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
||||
@ -445,6 +449,7 @@ public ApplicationReport createAndGetApplicationReport(boolean allowAccess) {
|
||||
FinalApplicationStatus finishState = getFinalApplicationStatus();
|
||||
String diags = UNAVAILABLE;
|
||||
float progress = 0.0f;
|
||||
org.apache.hadoop.yarn.api.records.Token amrmToken = null;
|
||||
if (allowAccess) {
|
||||
if (this.currentAttempt != null) {
|
||||
currentApplicationAttemptId = this.currentAttempt.getAppAttemptId();
|
||||
@ -466,6 +471,24 @@ public ApplicationReport createAndGetApplicationReport(boolean allowAccess) {
|
||||
progress = currentAttempt.getProgress();
|
||||
}
|
||||
diags = this.diagnostics.toString();
|
||||
|
||||
if (currentAttempt != null &&
|
||||
currentAttempt.getAppAttemptState() == RMAppAttemptState.LAUNCHED) {
|
||||
try {
|
||||
if (getApplicationSubmissionContext().getUnmanagedAM() &&
|
||||
getUser().equals(UserGroupInformation.getCurrentUser().getUserName())) {
|
||||
Token<AMRMTokenIdentifier> token = currentAttempt.getAMRMToken();
|
||||
if (token != null) {
|
||||
amrmToken = BuilderUtils.newAMRMToken(token.getIdentifier(),
|
||||
token.getKind().toString(), token.getPassword(),
|
||||
token.getService().toString());
|
||||
}
|
||||
}
|
||||
} catch (IOException ex) {
|
||||
LOG.warn("UserGroupInformation.getCurrentUser() error: " +
|
||||
ex.toString(), ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (currentApplicationAttemptId == null) {
|
||||
@ -479,7 +502,8 @@ public ApplicationReport createAndGetApplicationReport(boolean allowAccess) {
|
||||
this.name, host, rpcPort, clientToAMToken,
|
||||
createApplicationState(this.stateMachine.getCurrentState()), diags,
|
||||
trackingUrl, this.startTime, this.finishTime, finishState,
|
||||
appUsageReport, origTrackingUrl, progress, this.applicationType);
|
||||
appUsageReport, origTrackingUrl, progress, this.applicationType,
|
||||
amrmToken);
|
||||
} finally {
|
||||
this.readLock.unlock();
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user