From 8676a118a12165ae5a8b80a2a4596c133471ebc1 Mon Sep 17 00:00:00 2001 From: Jian He Date: Mon, 23 Nov 2015 17:18:59 -0800 Subject: [PATCH] YARN-4349. Support CallerContext in YARN. Contributed by Wangda Tan --- .../org/apache/hadoop/util/ToolRunner.java | 6 ++ .../org/apache/hadoop/mapred/YarnChild.java | 4 + .../hadoop/mapreduce/v2/app/MRAppMaster.java | 5 ++ hadoop-yarn-project/CHANGES.txt | 2 + .../metrics/ApplicationMetricsConstants.java | 6 ++ .../resourcemanager/ClientRMService.java | 18 +++-- .../server/resourcemanager/RMAppManager.java | 1 + .../server/resourcemanager/RMAuditLogger.java | 76 +++++++++++++++++-- .../metrics/ApplicationCreatedEvent.java | 11 ++- .../metrics/SystemMetricsPublisher.java | 21 +++-- .../recovery/RMStateStore.java | 11 +-- .../records/ApplicationStateData.java | 20 ++++- .../impl/pb/ApplicationStateDataPBImpl.java | 34 +++++++++ .../server/resourcemanager/rmapp/RMApp.java | 3 + .../resourcemanager/rmapp/RMAppImpl.java | 14 +++- .../scheduler/capacity/CapacityScheduler.java | 7 +- ...yarn_server_resourcemanager_recovery.proto | 2 + .../resourcemanager/TestRMAuditLogger.java | 61 +++++++++++++-- .../applicationsmanager/MockAsm.java | 5 ++ .../metrics/TestSystemMetricsPublisher.java | 5 ++ .../recovery/RMStateStoreTestBase.java | 36 +++++---- .../recovery/TestFSRMStateStore.java | 2 +- .../recovery/TestZKRMStateStore.java | 1 + .../resourcemanager/rmapp/MockRMApp.java | 5 ++ .../rmapp/TestRMAppTransitions.java | 4 +- .../rmcontainer/TestRMContainerImpl.java | 17 ++++- .../scheduler/fifo/TestFifoScheduler.java | 2 + 27 files changed, 318 insertions(+), 61 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ToolRunner.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ToolRunner.java index 16872d0891..8740be49d9 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ToolRunner.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ToolRunner.java @@ -23,6 +23,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.CallerContext; /** * A utility to help run {@link Tool}s. @@ -58,6 +59,11 @@ public class ToolRunner { */ public static int run(Configuration conf, Tool tool, String[] args) throws Exception{ + if (CallerContext.getCurrent() == null) { + CallerContext ctx = new CallerContext.Builder("CLI").build(); + CallerContext.setCurrent(ctx); + } + if(conf == null) { conf = new Configuration(); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java index ea9733c3ef..ec7ade7daa 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java @@ -34,6 +34,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.ipc.CallerContext; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.MRJobConfig; @@ -85,6 +86,9 @@ public static void main(String[] args) throws Throwable { long jvmIdLong = Long.parseLong(args[3]); JVMId jvmId = new JVMId(firstTaskid.getJobID(), firstTaskid.getTaskType() == TaskType.MAP, jvmIdLong); + + CallerContext.setCurrent( + new CallerContext.Builder("mr_" + firstTaskid.toString()).build()); // initialize metrics DefaultMetricsSystem.initialize( diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index 624a69a1db..f8c99f4857 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -42,6 +42,7 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.ipc.CallerContext; import org.apache.hadoop.mapred.FileOutputCommitter; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.LocalContainerLauncher; @@ -1544,6 +1545,10 @@ public static void main(String[] args) { ContainerId containerId = ConverterUtils.toContainerId(containerIdStr); ApplicationAttemptId applicationAttemptId = containerId.getApplicationAttemptId(); + if (applicationAttemptId != null) { + CallerContext.setCurrent(new CallerContext.Builder( + "mr_appmaster_" + applicationAttemptId.toString()).build()); + } long appSubmitTime = Long.parseLong(appSubmitTimeStr); diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 9b069db33e..a76c835986 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -253,6 +253,8 @@ Release 2.8.0 - UNRELEASED YARN-4184. Remove update reservation state api from state store as its not used by ReservationSystem (Sean Po via asuresh) + YARN-4349. Support CallerContext in YARN. (wtan via jianhe) + IMPROVEMENTS YARN-644. Basic null check is not performed on passed in arguments before diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ApplicationMetricsConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ApplicationMetricsConstants.java index 9ebbfb4ab2..c456e54d52 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ApplicationMetricsConstants.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ApplicationMetricsConstants.java @@ -75,6 +75,12 @@ public class ApplicationMetricsConstants { public static final String LATEST_APP_ATTEMPT_EVENT_INFO = "YARN_APPLICATION_LATEST_APP_ATTEMPT"; + + public static final String YARN_APP_CALLER_CONTEXT = + "YARN_APPLICATION_CALLER_CONTEXT"; + + public static final String YARN_APP_CALLER_SIGNATURE = + "YARN_APPLICATION_CALLER_SIGNATURE"; public static final String APP_TAGS_INFO = "YARN_APPLICATION_TAGS"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java index 812267d6a0..40a72dd693 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java @@ -40,6 +40,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.io.Text; +import org.apache.hadoop.ipc.CallerContext; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; @@ -552,6 +553,7 @@ public SubmitApplicationResponse submitApplication( ApplicationSubmissionContext submissionContext = request .getApplicationSubmissionContext(); ApplicationId applicationId = submissionContext.getApplicationId(); + CallerContext callerContext = CallerContext.getCurrent(); // ApplicationSubmissionContext needs to be validated for safety - only // those fields that are independent of the RM's configuration will be @@ -566,7 +568,7 @@ public SubmitApplicationResponse submitApplication( LOG.warn("Unable to get the current user.", ie); RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST, ie.getMessage(), "ClientRMService", - "Exception in submitting application", applicationId); + "Exception in submitting application", applicationId, callerContext); throw RPCUtil.getRemoteException(ie); } @@ -603,13 +605,13 @@ public SubmitApplicationResponse submitApplication( LOG.info("Application with id " + applicationId.getId() + " submitted by user " + user); RMAuditLogger.logSuccess(user, AuditConstants.SUBMIT_APP_REQUEST, - "ClientRMService", applicationId); + "ClientRMService", applicationId, callerContext); } catch (YarnException e) { LOG.info("Exception in submitting application with id " + applicationId.getId(), e); RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST, e.getMessage(), "ClientRMService", - "Exception in submitting application", applicationId); + "Exception in submitting application", applicationId, callerContext); throw e; } @@ -694,6 +696,7 @@ public KillApplicationResponse forceKillApplication( KillApplicationRequest request) throws YarnException { ApplicationId applicationId = request.getApplicationId(); + CallerContext callerContext = CallerContext.getCurrent(); UserGroupInformation callerUGI; try { @@ -702,7 +705,7 @@ public KillApplicationResponse forceKillApplication( LOG.info("Error getting UGI ", ie); RMAuditLogger.logFailure("UNKNOWN", AuditConstants.KILL_APP_REQUEST, "UNKNOWN", "ClientRMService" , "Error getting UGI", - applicationId); + applicationId, callerContext); throw RPCUtil.getRemoteException(ie); } @@ -710,7 +713,7 @@ public KillApplicationResponse forceKillApplication( if (application == null) { RMAuditLogger.logFailure(callerUGI.getUserName(), AuditConstants.KILL_APP_REQUEST, "UNKNOWN", "ClientRMService", - "Trying to kill an absent application", applicationId); + "Trying to kill an absent application", applicationId, callerContext); throw new ApplicationNotFoundException("Trying to kill an absent" + " application " + applicationId); } @@ -721,7 +724,7 @@ public KillApplicationResponse forceKillApplication( AuditConstants.KILL_APP_REQUEST, "User doesn't have permissions to " + ApplicationAccessType.MODIFY_APP.toString(), "ClientRMService", - AuditConstants.UNAUTHORIZED_USER, applicationId); + AuditConstants.UNAUTHORIZED_USER, applicationId, callerContext); throw RPCUtil.getRemoteException(new AccessControlException("User " + callerUGI.getShortUserName() + " cannot perform operation " + ApplicationAccessType.MODIFY_APP.name() + " on " + applicationId)); @@ -729,7 +732,8 @@ public KillApplicationResponse forceKillApplication( if (application.isAppFinalStateStored()) { RMAuditLogger.logSuccess(callerUGI.getShortUserName(), - AuditConstants.KILL_APP_REQUEST, "ClientRMService", applicationId); + AuditConstants.KILL_APP_REQUEST, "ClientRMService", applicationId, + callerContext); return KillApplicationResponse.newInstance(true); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index 711a7a7c0a..c9ea1b8728 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -26,6 +26,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.DataInputByteBuffer; +import org.apache.hadoop.ipc.CallerContext; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java index 8cb7c80f61..da7816b096 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java @@ -17,10 +17,12 @@ */ package org.apache.hadoop.yarn.server.resourcemanager; +import java.io.UnsupportedEncodingException; import java.net.InetAddress; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.ipc.CallerContext; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -35,7 +37,8 @@ public class RMAuditLogger { private static final Log LOG = LogFactory.getLog(RMAuditLogger.class); static enum Keys {USER, OPERATION, TARGET, RESULT, IP, PERMISSIONS, - DESCRIPTION, APPID, APPATTEMPTID, CONTAINERID} + DESCRIPTION, APPID, APPATTEMPTID, CONTAINERID, + CALLERCONTEXT, CALLERSIGNATURE} public static class AuditConstants { static final String SUCCESS = "SUCCESS"; @@ -69,12 +72,20 @@ public static class AuditConstants { public static final String UPDATE_RESERVATION_REQUEST = "Update Reservation Request"; public static final String DELETE_RESERVATION_REQUEST = "Delete Reservation Request"; } + + static String createSuccessLog(String user, String operation, String target, + ApplicationId appId, ApplicationAttemptId attemptId, + ContainerId containerId) { + return createSuccessLog(user, operation, target, appId, attemptId, + containerId, null); + } /** * A helper api for creating an audit log for a successful event. */ static String createSuccessLog(String user, String operation, String target, - ApplicationId appId, ApplicationAttemptId attemptId, ContainerId containerId) { + ApplicationId appId, ApplicationAttemptId attemptId, + ContainerId containerId, CallerContext callerContext) { StringBuilder b = new StringBuilder(); start(Keys.USER, user, b); addRemoteIP(b); @@ -90,8 +101,32 @@ static String createSuccessLog(String user, String operation, String target, if (containerId != null) { add(Keys.CONTAINERID, containerId.toString(), b); } + appendCallerContext(b, callerContext); return b.toString(); } + + private static void appendCallerContext(StringBuilder sb, CallerContext callerContext) { + String context = null; + byte[] signature = null; + + if (callerContext != null) { + context = callerContext.getContext(); + signature = callerContext.getSignature(); + } + + if (context != null) { + add(Keys.CALLERCONTEXT, context, sb); + } + + if (signature != null) { + try { + String sigStr = new String(signature, "UTF-8"); + add(Keys.CALLERSIGNATURE, sigStr, sb); + } catch (UnsupportedEncodingException e) { + // ignore this signature + } + } + } /** * Create a readable and parseable audit log string for a successful event. @@ -134,6 +169,14 @@ public static void logSuccess(String user, String operation, String target, null)); } } + + public static void logSuccess(String user, String operation, String target, + ApplicationId appId, CallerContext callerContext) { + if (LOG.isInfoEnabled()) { + LOG.info(createSuccessLog(user, operation, target, appId, null, null, + callerContext)); + } + } /** @@ -171,13 +214,11 @@ public static void logSuccess(String user, String operation, String target) { LOG.info(createSuccessLog(user, operation, target, null, null, null)); } } - - /** - * A helper api for creating an audit log for a failure event. - */ + static String createFailureLog(String user, String operation, String perm, String target, String description, ApplicationId appId, - ApplicationAttemptId attemptId, ContainerId containerId) { + ApplicationAttemptId attemptId, ContainerId containerId, + CallerContext callerContext) { StringBuilder b = new StringBuilder(); start(Keys.USER, user, b); addRemoteIP(b); @@ -195,9 +236,20 @@ static String createFailureLog(String user, String operation, String perm, if (containerId != null) { add(Keys.CONTAINERID, containerId.toString(), b); } + appendCallerContext(b, callerContext); return b.toString(); } + /** + * A helper api for creating an audit log for a failure event. + */ + static String createFailureLog(String user, String operation, String perm, + String target, String description, ApplicationId appId, + ApplicationAttemptId attemptId, ContainerId containerId) { + return createFailureLog(user, operation, perm, target, description, appId, + attemptId, containerId, null); + } + /** * Create a readable and parseable audit log string for a failed event. * @@ -246,7 +298,15 @@ public static void logFailure(String user, String operation, String perm, appId, attemptId, null)); } } - + + public static void logFailure(String user, String operation, String perm, + String target, String description, ApplicationId appId, + CallerContext callerContext) { + if (LOG.isWarnEnabled()) { + LOG.warn(createFailureLog(user, operation, perm, target, description, + appId, null, null, callerContext)); + } + } /** * Create a readable and parseable audit log string for a failed event. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ApplicationCreatedEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ApplicationCreatedEvent.java index a684dfc4b4..968a8fd9c8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ApplicationCreatedEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ApplicationCreatedEvent.java @@ -20,6 +20,7 @@ import java.util.Set; +import org.apache.hadoop.ipc.CallerContext; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Priority; @@ -37,6 +38,8 @@ public class ApplicationCreatedEvent extends private Priority applicationPriority; private String appNodeLabelsExpression; private String amNodeLabelsExpression; + private final CallerContext callerContext; + public ApplicationCreatedEvent(ApplicationId appId, String name, @@ -49,7 +52,8 @@ public ApplicationCreatedEvent(ApplicationId appId, boolean unmanagedApplication, Priority applicationPriority, String appNodeLabelsExpression, - String amNodeLabelsExpression) { + String amNodeLabelsExpression, + CallerContext callerContext) { super(SystemMetricsEventType.APP_CREATED, createdTime); this.appId = appId; this.name = name; @@ -62,6 +66,7 @@ public ApplicationCreatedEvent(ApplicationId appId, this.applicationPriority = applicationPriority; this.appNodeLabelsExpression = appNodeLabelsExpression; this.amNodeLabelsExpression = amNodeLabelsExpression; + this.callerContext = callerContext; } @Override @@ -112,4 +117,8 @@ public String getAppNodeLabelsExpression() { public String getAmNodeLabelsExpression() { return amNodeLabelsExpression; } + + public CallerContext getCallerContext() { + return callerContext; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java index 0f09735e7e..1bf7dbbde1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java @@ -46,7 +46,6 @@ import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants; import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; @@ -114,7 +113,8 @@ public void appCreated(RMApp app, long createdTime) { appSubmissionContext.getUnmanagedAM(), appSubmissionContext.getPriority(), app.getAppNodeLabelExpression(), - app.getAmNodeLabelExpression())); + app.getAmNodeLabelExpression(), + app.getCallerContext())); } } @@ -122,10 +122,9 @@ public void appCreated(RMApp app, long createdTime) { public void appUpdated(RMApp app, long updatedTime) { if (publishSystemMetrics) { dispatcher.getEventHandler() - .handle( - new ApplicationUpdatedEvent(app.getApplicationId(), app - .getQueue(), updatedTime, app - .getApplicationSubmissionContext().getPriority())); + .handle(new ApplicationUpdatedEvent(app.getApplicationId(), + app.getQueue(), updatedTime, + app.getApplicationSubmissionContext().getPriority())); } } @@ -284,6 +283,16 @@ private void publishApplicationCreatedEvent(ApplicationCreatedEvent event) { event.getAppNodeLabelsExpression()); entityInfo.put(ApplicationMetricsConstants.AM_NODE_LABEL_EXPRESSION, event.getAmNodeLabelsExpression()); + if (event.getCallerContext() != null) { + if (event.getCallerContext().getContext() != null) { + entityInfo.put(ApplicationMetricsConstants.YARN_APP_CALLER_CONTEXT, + event.getCallerContext().getContext()); + } + if (event.getCallerContext().getSignature() != null) { + entityInfo.put(ApplicationMetricsConstants.YARN_APP_CALLER_SIGNATURE, + event.getCallerContext().getSignature()); + } + } entity.setOtherInfo(entityInfo); TimelineEvent tEvent = new TimelineEvent(); tEvent.setEventType( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index 9a2e02a7f4..bced5b8087 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -37,6 +37,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; +import org.apache.hadoop.ipc.CallerContext; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.service.AbstractService; @@ -741,8 +742,8 @@ public synchronized void storeNewApplication(RMApp app) { .getApplicationSubmissionContext(); assert context instanceof ApplicationSubmissionContextPBImpl; ApplicationStateData appState = - ApplicationStateData.newInstance( - app.getSubmitTime(), app.getStartTime(), context, app.getUser()); + ApplicationStateData.newInstance(app.getSubmitTime(), + app.getStartTime(), context, app.getUser(), app.getCallerContext()); dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState)); } @@ -964,9 +965,9 @@ public void storeOrUpdateAMRMTokenSecretManager( @SuppressWarnings("unchecked") public synchronized void removeApplication(RMApp app) { ApplicationStateData appState = - ApplicationStateData.newInstance( - app.getSubmitTime(), app.getStartTime(), - app.getApplicationSubmissionContext(), app.getUser()); + ApplicationStateData.newInstance(app.getSubmitTime(), + app.getStartTime(), app.getApplicationSubmissionContext(), + app.getUser(), app.getCallerContext()); for(RMAppAttempt appAttempt : app.getAppAttempts().values()) { appState.attempts.put(appAttempt.getAppAttemptId(), null); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java index 43046a96a9..1d199edcca 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java @@ -20,10 +20,12 @@ import java.util.HashMap; import java.util.Map; + import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.ipc.CallerContext; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; @@ -43,8 +45,8 @@ public abstract class ApplicationStateData { public static ApplicationStateData newInstance(long submitTime, long startTime, String user, - ApplicationSubmissionContext submissionContext, - RMAppState state, String diagnostics, long finishTime) { + ApplicationSubmissionContext submissionContext, RMAppState state, + String diagnostics, long finishTime, CallerContext callerContext) { ApplicationStateData appState = Records.newRecord(ApplicationStateData.class); appState.setSubmitTime(submitTime); appState.setStartTime(startTime); @@ -53,12 +55,20 @@ public static ApplicationStateData newInstance(long submitTime, appState.setState(state); appState.setDiagnostics(diagnostics); appState.setFinishTime(finishTime); + appState.setCallerContext(callerContext); return appState; } + public static ApplicationStateData newInstance(long submitTime, + long startTime, ApplicationSubmissionContext context, String user, + CallerContext callerContext) { + return newInstance(submitTime, startTime, user, context, null, "", 0, + callerContext); + } + public static ApplicationStateData newInstance(long submitTime, long startTime, ApplicationSubmissionContext context, String user) { - return newInstance(submitTime, startTime, user, context, null, "", 0); + return newInstance(submitTime, startTime, context, user, null); } public int getAttemptCount() { @@ -144,4 +154,8 @@ public abstract void setApplicationSubmissionContext( public abstract long getFinishTime(); public abstract void setFinishTime(long finishTime); + + public abstract CallerContext getCallerContext(); + + public abstract void setCallerContext(CallerContext callerContext); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java index d8cbd2384e..15ed770840 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb; +import org.apache.hadoop.ipc.CallerContext; +import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationStateDataProto; @@ -26,6 +28,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import com.google.protobuf.ByteString; import com.google.protobuf.TextFormat; public class ApplicationStateDataPBImpl extends ApplicationStateData { @@ -209,6 +212,37 @@ public boolean equals(Object other) { } return false; } + + @Override + public CallerContext getCallerContext() { + ApplicationStateDataProtoOrBuilder p = viaProto ? proto : builder; + RpcHeaderProtos.RPCCallerContextProto pbContext = p.getCallerContext(); + if (pbContext != null) { + CallerContext context = new CallerContext.Builder(pbContext.getContext()) + .setSignature(pbContext.getSignature().toByteArray()).build(); + return context; + } + + return null; + } + + @Override + public void setCallerContext(CallerContext callerContext) { + if (callerContext != null) { + maybeInitBuilder(); + + RpcHeaderProtos.RPCCallerContextProto.Builder b = RpcHeaderProtos.RPCCallerContextProto + .newBuilder(); + if (callerContext.getContext() != null) { + b.setContext(callerContext.getContext()); + } + if (callerContext.getSignature() != null) { + b.setSignature(ByteString.copyFrom(callerContext.getSignature())); + } + + builder.setCallerContext(b); + } + } @Override public String toString() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java index 720d863e71..bb0fc347f4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java @@ -22,6 +22,7 @@ import java.util.Map; import java.util.Set; +import org.apache.hadoop.ipc.CallerContext; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -254,4 +255,6 @@ ApplicationReport createAndGetApplicationReport(String clientUserName, String getAmNodeLabelExpression(); String getAppNodeLabelExpression(); + + CallerContext getCallerContext(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index bcfce9a89a..92208496de 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -45,6 +45,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.DataInputByteBuffer; +import org.apache.hadoop.ipc.CallerContext; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; @@ -177,6 +178,8 @@ public class RMAppImpl implements RMApp, Recoverable { private RMAppState targetedFinalState; private RMAppState recoveredFinalState; private ResourceRequest amReq; + + private CallerContext callerContext; Object transitionTodo; @@ -439,6 +442,8 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext, this.stateMachine = stateMachineFactory.make(this); + this.callerContext = CallerContext.getCurrent(); + rmContext.getRMApplicationHistoryWriter().applicationStarted(this); rmContext.getSystemMetricsPublisher().appCreated(this, startTime); @@ -806,6 +811,7 @@ public void recover(RMState state) { .getDiagnostics()); this.storedFinishTime = appState.getFinishTime(); this.startTime = appState.getStartTime(); + this.callerContext = appState.getCallerContext(); for(int i=0; i SUBMITTED event RMAppEventType.RECOVER RMState state = new RMState(); ApplicationStateData appState = - ApplicationStateData.newInstance(123, 123, null, "user"); + ApplicationStateData.newInstance(123, 123, null, "user", null); state.getApplicationState().put(application.getApplicationId(), appState); RMAppEvent event = new RMAppRecoverEvent(application.getApplicationId(), state); @@ -1011,7 +1011,7 @@ public void createRMStateForApplications( ApplicationStateData appState = ApplicationStateData.newInstance(app.getSubmitTime(), app.getStartTime(), app.getUser(), app.getApplicationSubmissionContext(), rmAppState, - null, app.getFinishTime()); + null, app.getFinishTime(), null); applicationState.put(app.getApplicationId(), appState); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java index 415e891528..fa0e2ed4bb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java @@ -108,7 +108,11 @@ public void testReleaseWhileRunning() { when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer); when(rmContext.getRMApps()).thenReturn(rmApps); when(rmContext.getSystemMetricsPublisher()).thenReturn(publisher); - when(rmContext.getYarnConfiguration()).thenReturn(new YarnConfiguration()); + YarnConfiguration conf = new YarnConfiguration(); + conf.setBoolean( + YarnConfiguration.APPLICATION_HISTORY_SAVE_NON_AM_CONTAINER_META_INFO, + true); + when(rmContext.getYarnConfiguration()).thenReturn(conf); RMContainer rmContainer = new RMContainerImpl(container, appAttemptId, nodeId, "user", rmContext); @@ -202,8 +206,14 @@ public void testExpireWhileRunning() { when(rmContext.getContainerAllocationExpirer()).thenReturn(expirer); when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer); when(rmContext.getSystemMetricsPublisher()).thenReturn(publisher); - when(rmContext.getYarnConfiguration()).thenReturn(new YarnConfiguration()); + + YarnConfiguration conf = new YarnConfiguration(); + conf.setBoolean( + YarnConfiguration.APPLICATION_HISTORY_SAVE_NON_AM_CONTAINER_META_INFO, + true); + when(rmContext.getYarnConfiguration()).thenReturn(conf); when(rmContext.getRMApps()).thenReturn(appMap); + RMContainer rmContainer = new RMContainerImpl(container, appAttemptId, nodeId, "user", rmContext); @@ -388,6 +398,9 @@ public void testExistenceOfResourceRequestInRMContainer() throws Exception { public void testStoreAllContainerMetrics() throws Exception { Configuration conf = new Configuration(); conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); + conf.setBoolean( + YarnConfiguration.APPLICATION_HISTORY_SAVE_NON_AM_CONTAINER_META_INFO, + true); MockRM rm1 = new MockRM(conf); SystemMetricsPublisher publisher = mock(SystemMetricsPublisher.class); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java index 78322b7cee..8111e11026 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java @@ -177,6 +177,7 @@ public void testAppAttemptMetrics() throws Exception { mock(SystemMetricsPublisher.class)); Configuration conf = new Configuration(); + ((RMContextImpl) rmContext).setScheduler(scheduler); scheduler.setRMContext(rmContext); scheduler.init(conf); scheduler.start(); @@ -309,6 +310,7 @@ public Map getNodes(){ rmContext.setNodeLabelManager(nlm); scheduler.setRMContext(rmContext); + ((RMContextImpl) rmContext).setScheduler(scheduler); scheduler.init(conf); scheduler.start(); scheduler.reinitialize(new Configuration(), rmContext);