YARN-7225. Add queue and partition info to RM audit log. Contributed by Eric Payne
This commit is contained in:
parent
d174b91635
commit
2ab611d48b
@ -580,7 +580,8 @@ public SubmitApplicationResponse submitApplication(
|
||||
LOG.warn("Unable to get the current user.", ie);
|
||||
RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
|
||||
ie.getMessage(), "ClientRMService",
|
||||
"Exception in submitting application", applicationId, callerContext);
|
||||
"Exception in submitting application", applicationId, callerContext,
|
||||
submissionContext.getQueue());
|
||||
throw RPCUtil.getRemoteException(ie);
|
||||
}
|
||||
|
||||
@ -603,7 +604,8 @@ public SubmitApplicationResponse submitApplication(
|
||||
". Flow run should be a long integer", e);
|
||||
RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
|
||||
e.getMessage(), "ClientRMService",
|
||||
"Exception in submitting application", applicationId);
|
||||
"Exception in submitting application", applicationId,
|
||||
submissionContext.getQueue());
|
||||
throw RPCUtil.getRemoteException(e);
|
||||
}
|
||||
}
|
||||
@ -662,12 +664,14 @@ public SubmitApplicationResponse submitApplication(
|
||||
LOG.info("Application with id " + applicationId.getId() +
|
||||
" submitted by user " + user);
|
||||
RMAuditLogger.logSuccess(user, AuditConstants.SUBMIT_APP_REQUEST,
|
||||
"ClientRMService", applicationId, callerContext);
|
||||
"ClientRMService", applicationId, callerContext,
|
||||
submissionContext.getQueue());
|
||||
} catch (YarnException e) {
|
||||
LOG.info("Exception in submitting " + applicationId, e);
|
||||
RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
|
||||
e.getMessage(), "ClientRMService",
|
||||
"Exception in submitting application", applicationId, callerContext);
|
||||
"Exception in submitting application", applicationId, callerContext,
|
||||
submissionContext.getQueue());
|
||||
throw e;
|
||||
}
|
||||
|
||||
|
@ -40,7 +40,7 @@ public class RMAuditLogger {
|
||||
enum Keys {USER, OPERATION, TARGET, RESULT, IP, PERMISSIONS,
|
||||
DESCRIPTION, APPID, APPATTEMPTID, CONTAINERID,
|
||||
CALLERCONTEXT, CALLERSIGNATURE, RESOURCE, QUEUENAME,
|
||||
INCLUDEAPPS, INCLUDECHILDQUEUES, RECURSIVE}
|
||||
INCLUDEAPPS, INCLUDECHILDQUEUES, RECURSIVE, NODELABEL}
|
||||
|
||||
public static class AuditConstants {
|
||||
static final String SUCCESS = "SUCCESS";
|
||||
@ -98,7 +98,7 @@ static String createSuccessLog(String user, String operation, String target,
|
||||
ApplicationId appId, ApplicationAttemptId attemptId,
|
||||
ContainerId containerId, Resource resource) {
|
||||
return createSuccessLog(user, operation, target, appId, attemptId,
|
||||
containerId, resource, null, Server.getRemoteIp());
|
||||
containerId, resource, null, Server.getRemoteIp(), null, null);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -124,7 +124,7 @@ private static StringBuilder createStringBuilderForSuccessEvent(String user,
|
||||
static String createSuccessLog(String user, String operation, String target,
|
||||
ApplicationId appId, ApplicationAttemptId attemptId,
|
||||
ContainerId containerId, Resource resource, CallerContext callerContext,
|
||||
InetAddress ip) {
|
||||
InetAddress ip, String queueName, String partition) {
|
||||
StringBuilder b =
|
||||
createStringBuilderForSuccessEvent(user, operation, target, ip);
|
||||
if (appId != null) {
|
||||
@ -140,6 +140,12 @@ static String createSuccessLog(String user, String operation, String target,
|
||||
add(Keys.RESOURCE, resource.toString(), b);
|
||||
}
|
||||
appendCallerContext(b, callerContext);
|
||||
if (queueName != null) {
|
||||
add(Keys.QUEUENAME, queueName, b);
|
||||
}
|
||||
if (partition != null) {
|
||||
add(Keys.NODELABEL, partition, b);
|
||||
}
|
||||
return b.toString();
|
||||
}
|
||||
|
||||
@ -202,6 +208,32 @@ public static void logSuccess(String user, String operation, String target,
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a readable and parseable audit log string for a successful event.
|
||||
*
|
||||
* @param user User who made the service request to the ResourceManager
|
||||
* @param operation Operation requested by the user.
|
||||
* @param target The target on which the operation is being performed.
|
||||
* @param appId Application Id in which operation was performed.
|
||||
* @param containerId Container Id in which operation was performed.
|
||||
* @param resource Resource associated with container.
|
||||
* @param queueName Name of queue.
|
||||
* @param partition Name of labeled partition.
|
||||
*
|
||||
* <br><br>
|
||||
* Note that the {@link RMAuditLogger} uses tabs ('\t') as a key-val delimiter
|
||||
* and hence the value fields should not contains tabs ('\t').
|
||||
*/
|
||||
public static void logSuccess(String user, String operation, String target,
|
||||
ApplicationId appId, ContainerId containerId, Resource resource,
|
||||
String queueName, String partition) {
|
||||
if (LOG.isInfoEnabled()) {
|
||||
LOG.info(createSuccessLog(user, operation, target, appId, null,
|
||||
containerId, resource, null, Server.getRemoteIp(), queueName,
|
||||
partition));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a general readable and parseable audit log string for a successful
|
||||
* event.
|
||||
@ -263,12 +295,20 @@ public static void logSuccess(String user, String operation, String target,
|
||||
null, null));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public static void logSuccess(String user, String operation, String target,
|
||||
ApplicationId appId, CallerContext callerContext) {
|
||||
if (LOG.isInfoEnabled()) {
|
||||
LOG.info(createSuccessLog(user, operation, target, appId, null, null,
|
||||
null, callerContext, Server.getRemoteIp()));
|
||||
null, callerContext, Server.getRemoteIp(), null, null));
|
||||
}
|
||||
}
|
||||
|
||||
public static void logSuccess(String user, String operation, String target,
|
||||
ApplicationId appId, CallerContext callerContext, String queueName) {
|
||||
if (LOG.isInfoEnabled()) {
|
||||
LOG.info(createSuccessLog(user, operation, target, appId, null, null,
|
||||
null, callerContext, Server.getRemoteIp(), queueName, null));
|
||||
}
|
||||
}
|
||||
|
||||
@ -296,7 +336,7 @@ public static void logSuccess(String user, String operation, String target,
|
||||
ApplicationId appId, InetAddress ip) {
|
||||
if (LOG.isInfoEnabled()) {
|
||||
LOG.info(createSuccessLog(user, operation, target, appId, null, null,
|
||||
null, null, ip));
|
||||
null, null, ip, null, null));
|
||||
}
|
||||
}
|
||||
|
||||
@ -355,7 +395,7 @@ private static StringBuilder createStringBuilderForFailureLog(String user,
|
||||
static String createFailureLog(String user, String operation, String perm,
|
||||
String target, String description, ApplicationId appId,
|
||||
ApplicationAttemptId attemptId, ContainerId containerId,
|
||||
Resource resource, CallerContext callerContext) {
|
||||
Resource resource, CallerContext callerContext, String queueName) {
|
||||
StringBuilder b = createStringBuilderForFailureLog(user,
|
||||
operation, target, description, perm);
|
||||
if (appId != null) {
|
||||
@ -371,6 +411,9 @@ static String createFailureLog(String user, String operation, String perm,
|
||||
add(Keys.RESOURCE, resource.toString(), b);
|
||||
}
|
||||
appendCallerContext(b, callerContext);
|
||||
if (queueName != null) {
|
||||
add(Keys.QUEUENAME, queueName, b);
|
||||
}
|
||||
return b.toString();
|
||||
}
|
||||
|
||||
@ -381,7 +424,7 @@ static String createFailureLog(String user, String operation, String perm,
|
||||
String target, String description, ApplicationId appId,
|
||||
ApplicationAttemptId attemptId, ContainerId containerId, Resource resource) {
|
||||
return createFailureLog(user, operation, perm, target, description, appId,
|
||||
attemptId, containerId, resource, null);
|
||||
attemptId, containerId, resource, null, null);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -447,13 +490,22 @@ public static void logFailure(String user, String operation, String perm,
|
||||
appId, attemptId, null, null));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public static void logFailure(String user, String operation, String perm,
|
||||
String target, String description, ApplicationId appId,
|
||||
CallerContext callerContext) {
|
||||
if (LOG.isWarnEnabled()) {
|
||||
LOG.warn(createFailureLog(user, operation, perm, target, description,
|
||||
appId, null, null, null, callerContext));
|
||||
appId, null, null, null, callerContext, null));
|
||||
}
|
||||
}
|
||||
|
||||
public static void logFailure(String user, String operation, String perm,
|
||||
String target, String description, ApplicationId appId,
|
||||
CallerContext callerContext, String queueName) {
|
||||
if (LOG.isWarnEnabled()) {
|
||||
LOG.warn(createFailureLog(user, operation, perm, target, description,
|
||||
appId, null, null, null, callerContext, queueName));
|
||||
}
|
||||
}
|
||||
|
||||
@ -480,6 +532,15 @@ public static void logFailure(String user, String operation, String perm,
|
||||
}
|
||||
}
|
||||
|
||||
public static void logFailure(String user, String operation, String perm,
|
||||
String target, String description, ApplicationId appId,
|
||||
String queueName) {
|
||||
if (LOG.isWarnEnabled()) {
|
||||
LOG.warn(createFailureLog(user, operation, perm, target, description,
|
||||
appId, null, null, null, null, queueName));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a readable and parseable audit log string for a failed event.
|
||||
*
|
||||
|
@ -218,9 +218,16 @@ public boolean containerCompleted(RMContainer rmContainer,
|
||||
|
||||
containersToPreempt.remove(containerId);
|
||||
|
||||
// In order to save space in the audit log, only include the partition
|
||||
// if it is not the default partition.
|
||||
String containerPartition = null;
|
||||
if (partition != null && !partition.isEmpty()) {
|
||||
containerPartition = partition;
|
||||
}
|
||||
Resource containerResource = rmContainer.getContainer().getResource();
|
||||
RMAuditLogger.logSuccess(getUser(), AuditConstants.RELEASE_CONTAINER,
|
||||
"SchedulerApp", getApplicationId(), containerId, containerResource);
|
||||
"SchedulerApp", getApplicationId(), containerId, containerResource,
|
||||
getQueueName(), containerPartition);
|
||||
|
||||
// Update usage metrics
|
||||
queue.getMetrics().releaseResources(partition,
|
||||
@ -646,9 +653,17 @@ public boolean apply(Resource cluster, ResourceCommitRequest<FiCaSchedulerApp,
|
||||
+ " host=" + rmContainer.getAllocatedNode().getHost()
|
||||
+ " type=" + allocation.getAllocationLocalityType());
|
||||
}
|
||||
// In order to save space in the audit log, only include the partition
|
||||
// if it is not the default partition.
|
||||
String partition =
|
||||
schedulerContainer.getSchedulerNode().getPartition();
|
||||
if (partition != null && partition.isEmpty()) {
|
||||
partition = null;
|
||||
}
|
||||
RMAuditLogger.logSuccess(getUser(), AuditConstants.ALLOC_CONTAINER,
|
||||
"SchedulerApp", getApplicationId(), containerId,
|
||||
allocation.getAllocatedOrReservedResource());
|
||||
allocation.getAllocatedOrReservedResource(), getQueueName(),
|
||||
partition);
|
||||
} else {
|
||||
// If the rmContainer's state is already updated to RESERVED, this is
|
||||
// a reReservation
|
||||
|
@ -162,7 +162,8 @@ void containerCompleted(RMContainer rmContainer,
|
||||
|
||||
Resource containerResource = rmContainer.getContainer().getResource();
|
||||
RMAuditLogger.logSuccess(getUser(), AuditConstants.RELEASE_CONTAINER,
|
||||
"SchedulerApp", getApplicationId(), containerId, containerResource);
|
||||
"SchedulerApp", getApplicationId(), containerId, containerResource,
|
||||
rmContainer.getQueueName(), null);
|
||||
|
||||
// Update usage metrics
|
||||
queue.getMetrics().releaseResources(
|
||||
@ -479,7 +480,7 @@ public RMContainer allocate(NodeType type, FSSchedulerNode node,
|
||||
}
|
||||
RMAuditLogger.logSuccess(getUser(), AuditConstants.ALLOC_CONTAINER,
|
||||
"SchedulerApp", getApplicationId(), container.getId(),
|
||||
container.getResource());
|
||||
container.getResource(), getQueueName(), null);
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
|
@ -99,9 +99,17 @@ public RMContainer allocate(NodeType type, FiCaSchedulerNode node,
|
||||
.getApplicationAttemptId() + " container=" + containerId + " host="
|
||||
+ container.getNodeId().getHost() + " type=" + type);
|
||||
}
|
||||
// In order to save space in the audit log, only include the partition
|
||||
// if it is not the default partition.
|
||||
String partition = null;
|
||||
if (appAMNodePartitionName != null &&
|
||||
!appAMNodePartitionName.isEmpty()) {
|
||||
partition = appAMNodePartitionName;
|
||||
}
|
||||
RMAuditLogger.logSuccess(getUser(),
|
||||
RMAuditLogger.AuditConstants.ALLOC_CONTAINER, "SchedulerApp",
|
||||
getApplicationId(), containerId, container.getResource());
|
||||
getApplicationId(), containerId, container.getResource(),
|
||||
getQueueName(), partition);
|
||||
|
||||
return rmContainer;
|
||||
} finally {
|
||||
|
@ -67,6 +67,7 @@ public class TestRMAuditLogger {
|
||||
private static final Resource RESOURCE = mock(Resource.class);
|
||||
private static final String CALLER_CONTEXT = "context";
|
||||
private static final byte[] CALLER_SIGNATURE = "signature".getBytes();
|
||||
private static final String PARTITION = "label1";
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
@ -132,6 +133,14 @@ private void testSuccessLogFormatHelper(boolean checkIP, ApplicationId appId,
|
||||
ApplicationAttemptId attemptId, ContainerId containerId,
|
||||
CallerContext callerContext, Resource resource, InetAddress remoteIp,
|
||||
RMAuditLogger.ArgsBuilder args) {
|
||||
testSuccessLogFormatHelper(checkIP, appId, attemptId, containerId,
|
||||
callerContext, resource, remoteIp, args, null, null);
|
||||
}
|
||||
|
||||
private void testSuccessLogFormatHelper(boolean checkIP, ApplicationId appId,
|
||||
ApplicationAttemptId attemptId, ContainerId containerId,
|
||||
CallerContext callerContext, Resource resource, InetAddress remoteIp,
|
||||
RMAuditLogger.ArgsBuilder args, String queueName, String partition) {
|
||||
String sLog;
|
||||
InetAddress tmpIp = checkIP ? remoteIp : null;
|
||||
if (args != null) {
|
||||
@ -139,7 +148,8 @@ private void testSuccessLogFormatHelper(boolean checkIP, ApplicationId appId,
|
||||
tmpIp, args);
|
||||
} else {
|
||||
sLog = RMAuditLogger.createSuccessLog(USER, OPERATION, TARGET, appId,
|
||||
attemptId, containerId, resource, callerContext, tmpIp);
|
||||
attemptId, containerId, resource, callerContext, tmpIp, queueName,
|
||||
partition);
|
||||
}
|
||||
StringBuilder expLog = new StringBuilder();
|
||||
expLog.append("USER=test\t");
|
||||
@ -177,6 +187,13 @@ private void testSuccessLogFormatHelper(boolean checkIP, ApplicationId appId,
|
||||
if (args != null) {
|
||||
expLog.append("\tQUEUENAME=root");
|
||||
expLog.append("\tRECURSIVE=true");
|
||||
} else {
|
||||
if (queueName != null) {
|
||||
expLog.append("\tQUEUENAME=" + QUEUE);
|
||||
}
|
||||
}
|
||||
if (partition != null) {
|
||||
expLog.append("\tNODELABEL=" + PARTITION);
|
||||
}
|
||||
assertEquals(expLog.toString(), sLog);
|
||||
}
|
||||
@ -258,6 +275,8 @@ private void testSuccessLogFormat(boolean checkIP) {
|
||||
.append(Keys.QUEUENAME, QUEUE).append(Keys.RECURSIVE, "true");
|
||||
testSuccessLogFormatHelper(checkIP, null, null, null, null, null,
|
||||
Server.getRemoteIp(), args);
|
||||
testSuccessLogFormatHelper(checkIP, null, null, null, null, null,
|
||||
Server.getRemoteIp(), null, QUEUE, PARTITION);
|
||||
testSuccessLogFormatHelperWithIP(checkIP, APPID, ATTEMPTID, CONTAINERID);
|
||||
testSuccessLogNulls(checkIP);
|
||||
}
|
||||
@ -283,7 +302,7 @@ private void testFailureLogFormatHelper(boolean checkIP, ApplicationId appId,
|
||||
RMAuditLogger.ArgsBuilder args) {
|
||||
String fLog = args == null ?
|
||||
RMAuditLogger.createFailureLog(USER, OPERATION, PERM, TARGET, DESC,
|
||||
appId, attemptId, containerId, resource, callerContext) :
|
||||
appId, attemptId, containerId, resource, callerContext, null) :
|
||||
RMAuditLogger.createFailureLog(USER, OPERATION, PERM, TARGET, DESC,
|
||||
args);
|
||||
StringBuilder expLog = new StringBuilder();
|
||||
|
Loading…
Reference in New Issue
Block a user