YARN-10476. Queue metrics for Unmanaged applications (#2674). Contributed by Cyrus Jackson

This commit is contained in:
Cyrus Jackson 2021-03-19 15:49:05 +05:30 committed by GitHub
parent 4781761dc2
commit cd44e917d0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 446 additions and 114 deletions

View File

@ -102,11 +102,13 @@ public class AppSchedulingInfo {
private final Map<String, String> applicationSchedulingEnvs = new HashMap<>();
private final RMContext rmContext;
private final int retryAttempts;
private boolean unmanagedAM;
public AppSchedulingInfo(ApplicationAttemptId appAttemptId, String user,
Queue queue, AbstractUsersManager abstractUsersManager, long epoch,
ResourceUsage appResourceUsage,
Map<String, String> applicationSchedulingEnvs, RMContext rmContext) {
Map<String, String> applicationSchedulingEnvs, RMContext rmContext,
boolean unmanagedAM) {
this.applicationAttemptId = appAttemptId;
this.applicationId = appAttemptId.getApplicationId();
this.queue = queue;
@ -120,6 +122,7 @@ public class AppSchedulingInfo {
this.retryAttempts = rmContext.getYarnConfiguration().getInt(
YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_RETRY_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_PLACEMENT_CONSTRAINTS_RETRY_ATTEMPTS);
this.unmanagedAM = unmanagedAM;
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
updateContext = new ContainerUpdateContext(this);
@ -156,6 +159,14 @@ public class AppSchedulingInfo {
return pending;
}
public void setUnmanagedAM(boolean unmanagedAM) {
this.unmanagedAM = unmanagedAM;
}
public boolean isUnmanagedAM() {
return unmanagedAM;
}
public Set<String> getRequestedPartitions() {
return requestedPartitions;
}
@ -617,8 +628,10 @@ public class AppSchedulingInfo {
ap.getPrimaryRequestedNodePartition(), delta);
}
}
oldMetrics.moveAppFrom(this);
newMetrics.moveAppTo(this);
oldMetrics.moveAppFrom(this, isUnmanagedAM());
newMetrics.moveAppTo(this, isUnmanagedAM());
abstractUsersManager.deactivateApplication(user, applicationId);
abstractUsersManager = newQueue.getAbstractUsersManager();
if (!schedulerKeys.isEmpty()) {
@ -649,7 +662,8 @@ public class AppSchedulingInfo {
ask.getCount()));
}
}
metrics.finishAppAttempt(applicationId, pending, user);
metrics.finishAppAttempt(applicationId, pending, user, unmanagedAM);
// Clear requests themselves
clearRequests();
@ -695,7 +709,7 @@ public class AppSchedulingInfo {
// If there was any container to recover, the application was
// running from scheduler's POV.
pending = false;
metrics.runAppAttempt(applicationId, user);
metrics.runAppAttempt(applicationId, user, isUnmanagedAM());
}
// Container is completed. Skip recovering resources.
@ -736,7 +750,7 @@ public class AppSchedulingInfo {
// once an allocation is done we assume the application is
// running from scheduler's POV.
pending = false;
metrics.runAppAttempt(applicationId, user);
metrics.runAppAttempt(applicationId, user, isUnmanagedAM());
}
updateMetrics(applicationId, type, node, containerAllocated, user, queue);

View File

@ -62,6 +62,20 @@ public class QueueMetrics implements MetricsSource {
@Metric("# of apps completed") MutableCounterInt appsCompleted;
@Metric("# of apps killed") MutableCounterInt appsKilled;
@Metric("# of apps failed") MutableCounterInt appsFailed;
@Metric("# of Unmanaged apps submitted")
private MutableCounterInt unmanagedAppsSubmitted;
@Metric("# of Unmanaged running apps")
private MutableGaugeInt unmanagedAppsRunning;
@Metric("# of Unmanaged pending apps")
private MutableGaugeInt unmanagedAppsPending;
@Metric("# of Unmanaged apps completed")
private MutableCounterInt unmanagedAppsCompleted;
@Metric("# of Unmanaged apps killed")
private MutableCounterInt unmanagedAppsKilled;
@Metric("# of Unmanaged apps failed")
private MutableCounterInt unmanagedAppsFailed;
@Metric("Aggregate # of allocated node-local containers")
MutableCounterLong aggregateNodeLocalContainersAllocated;
@Metric("Aggregate # of allocated rack-local containers")
@ -401,103 +415,158 @@ public class QueueMetrics implements MetricsSource {
registry.snapshot(collector.addRecord(registry.info()), all);
}
public void submitApp(String user) {
public void submitApp(String user, boolean unmanagedAM) {
appsSubmitted.incr();
if(unmanagedAM) {
unmanagedAppsSubmitted.incr();
}
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.submitApp(user);
userMetrics.submitApp(user, unmanagedAM);
}
if (parent != null) {
parent.submitApp(user);
parent.submitApp(user, unmanagedAM);
}
}
public void submitAppAttempt(String user) {
public void submitAppAttempt(String user, boolean unmanagedAM) {
appsPending.incr();
if(unmanagedAM) {
unmanagedAppsPending.incr();
}
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.submitAppAttempt(user);
userMetrics.submitAppAttempt(user, unmanagedAM);
}
if (parent != null) {
parent.submitAppAttempt(user);
parent.submitAppAttempt(user, unmanagedAM);
}
}
public void runAppAttempt(ApplicationId appId, String user) {
public void runAppAttempt(ApplicationId appId, String user,
boolean unmanagedAM) {
runBuckets.add(appId, System.currentTimeMillis());
appsRunning.incr();
appsPending.decr();
if(unmanagedAM) {
unmanagedAppsRunning.incr();
unmanagedAppsPending.decr();
}
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.runAppAttempt(appId, user);
userMetrics.runAppAttempt(appId, user, unmanagedAM);
}
if (parent != null) {
parent.runAppAttempt(appId, user);
parent.runAppAttempt(appId, user, unmanagedAM);
}
}
public void finishAppAttempt(
ApplicationId appId, boolean isPending, String user) {
public void finishAppAttempt(ApplicationId appId, boolean isPending,
String user, boolean unmanagedAM) {
runBuckets.remove(appId);
if (isPending) {
appsPending.decr();
} else {
appsRunning.decr();
}
if(unmanagedAM) {
if (isPending) {
unmanagedAppsPending.decr();
} else {
unmanagedAppsRunning.decr();
}
}
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.finishAppAttempt(appId, isPending, user);
userMetrics.finishAppAttempt(appId, isPending, user, unmanagedAM);
}
if (parent != null) {
parent.finishAppAttempt(appId, isPending, user);
parent.finishAppAttempt(appId, isPending, user, unmanagedAM);
}
}
public void finishApp(String user, RMAppState rmAppFinalState) {
public void finishApp(String user, RMAppState rmAppFinalState,
boolean unmanagedAM) {
switch (rmAppFinalState) {
case KILLED: appsKilled.incr(); break;
case FAILED: appsFailed.incr(); break;
default: appsCompleted.incr(); break;
}
if(unmanagedAM) {
switch (rmAppFinalState) {
case KILLED:
unmanagedAppsKilled.incr();
break;
case FAILED:
unmanagedAppsFailed.incr();
break;
default:
unmanagedAppsCompleted.incr();
break;
}
}
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.finishApp(user, rmAppFinalState);
userMetrics.finishApp(user, rmAppFinalState, unmanagedAM);
}
if (parent != null) {
parent.finishApp(user, rmAppFinalState);
parent.finishApp(user, rmAppFinalState, unmanagedAM);
}
}
public void moveAppFrom(AppSchedulingInfo app) {
public void moveAppFrom(AppSchedulingInfo app, boolean unmanagedAM) {
if (app.isPending()) {
appsPending.decr();
} else {
appsRunning.decr();
}
if(unmanagedAM) {
if (app.isPending()) {
unmanagedAppsPending.decr();
} else {
unmanagedAppsRunning.decr();
}
}
QueueMetrics userMetrics = getUserMetrics(app.getUser());
if (userMetrics != null) {
userMetrics.moveAppFrom(app);
userMetrics.moveAppFrom(app, unmanagedAM);
}
if (parent != null) {
parent.moveAppFrom(app);
parent.moveAppFrom(app, unmanagedAM);
}
}
public void moveAppTo(AppSchedulingInfo app) {
public void moveAppTo(AppSchedulingInfo app, boolean unmanagedAM) {
if (app.isPending()) {
appsPending.incr();
} else {
appsRunning.incr();
}
if(unmanagedAM) {
if (app.isPending()) {
unmanagedAppsPending.incr();
} else {
unmanagedAppsRunning.incr();
}
}
QueueMetrics userMetrics = getUserMetrics(app.getUser());
if (userMetrics != null) {
userMetrics.moveAppTo(app);
userMetrics.moveAppTo(app, unmanagedAM);
}
if (parent != null) {
parent.moveAppTo(app);
parent.moveAppTo(app, unmanagedAM);
}
}
/**
* Set available resources. To be called by scheduler periodically as
* resources become available.
@ -1024,18 +1093,34 @@ public class QueueMetrics implements MetricsSource {
return appsSubmitted.value();
}
public int getUnmanagedAppsSubmitted() {
return unmanagedAppsSubmitted.value();
}
public int getAppsRunning() {
return appsRunning.value();
}
public int getUnmanagedAppsRunning() {
return unmanagedAppsRunning.value();
}
public int getAppsPending() {
return appsPending.value();
}
public int getUnmanagedAppsPending() {
return unmanagedAppsPending.value();
}
public int getAppsCompleted() {
return appsCompleted.value();
}
public int getUnmanagedAppsCompleted() {
return unmanagedAppsCompleted.value();
}
public int getAppsKilled() {
return appsKilled.value();
}
@ -1044,6 +1129,10 @@ public class QueueMetrics implements MetricsSource {
return appsFailed.value();
}
public int getUnmanagedAppsFailed() {
return unmanagedAppsFailed.value();
}
public Resource getAllocatedResources() {
if (queueMetricsForCustomResources != null) {
return Resource.newInstance(allocatedMB.value(), allocatedVCores.value(),

View File

@ -30,16 +30,20 @@ public class SchedulerApplication<T extends SchedulerApplicationAttempt> {
private final String user;
private volatile T currentAttempt;
private volatile Priority priority;
private boolean unmanagedAM;
public SchedulerApplication(Queue queue, String user) {
public SchedulerApplication(Queue queue, String user, boolean unmanagedAM) {
this.queue = queue;
this.user = user;
this.unmanagedAM = unmanagedAM;
this.priority = null;
}
public SchedulerApplication(Queue queue, String user, Priority priority) {
public SchedulerApplication(Queue queue, String user, Priority priority,
boolean unmanagedAM) {
this.queue = queue;
this.user = user;
this.unmanagedAM = unmanagedAM;
this.priority = priority;
}
@ -64,7 +68,7 @@ public class SchedulerApplication<T extends SchedulerApplicationAttempt> {
}
public void stop(RMAppState rmAppFinalState) {
queue.getMetrics().finishApp(user, rmAppFinalState);
queue.getMetrics().finishApp(user, rmAppFinalState, isUnmanagedAM());
}
public Priority getPriority() {
@ -80,4 +84,7 @@ public class SchedulerApplication<T extends SchedulerApplicationAttempt> {
}
}
public boolean isUnmanagedAM() {
return unmanagedAM;
}
}

View File

@ -241,7 +241,7 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
this.appSchedulingInfo = new AppSchedulingInfo(applicationAttemptId, user,
queue, abstractUsersManager, rmContext.getEpoch(), attemptResourceUsage,
applicationSchedulingEnvs, rmContext);
applicationSchedulingEnvs, rmContext, unmanagedAM);
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
readLock = lock.readLock();
writeLock = lock.writeLock();

View File

@ -865,7 +865,8 @@ public class CapacityScheduler extends
private void addApplicationOnRecovery(ApplicationId applicationId,
String queueName, String user,
Priority priority, ApplicationPlacementContext placementContext) {
Priority priority, ApplicationPlacementContext placementContext,
boolean unmanagedAM) {
writeLock.lock();
try {
//check if the queue needs to be auto-created during recovery
@ -927,9 +928,11 @@ public class CapacityScheduler extends
// Ignore the exception for recovered app as the app was previously
// accepted.
}
queue.getMetrics().submitApp(user);
queue.getMetrics().submitApp(user, unmanagedAM);
SchedulerApplication<FiCaSchedulerApp> application =
new SchedulerApplication<FiCaSchedulerApp>(queue, user, priority);
new SchedulerApplication<FiCaSchedulerApp>(queue, user, priority,
unmanagedAM);
applications.put(applicationId, application);
LOG.info("Accepted application " + applicationId + " from user: " + user
+ ", in queue: " + queueName);
@ -1012,7 +1015,7 @@ public class CapacityScheduler extends
private void addApplication(ApplicationId applicationId, String queueName,
String user, Priority priority,
ApplicationPlacementContext placementContext) {
ApplicationPlacementContext placementContext, boolean unmanagedAM) {
writeLock.lock();
try {
if (isSystemAppsLimitReached()) {
@ -1116,9 +1119,10 @@ public class CapacityScheduler extends
return;
}
// update the metrics
queue.getMetrics().submitApp(user);
queue.getMetrics().submitApp(user, unmanagedAM);
SchedulerApplication<FiCaSchedulerApp> application =
new SchedulerApplication<FiCaSchedulerApp>(queue, user, priority);
new SchedulerApplication<FiCaSchedulerApp>(queue, user, priority,
unmanagedAM);
applications.put(applicationId, application);
LOG.info("Accepted application " + applicationId + " from user: " + user
+ ", in queue: " + queueName);
@ -1986,11 +1990,13 @@ public class CapacityScheduler extends
if (!appAddedEvent.getIsAppRecovering()) {
addApplication(appAddedEvent.getApplicationId(), queueName,
appAddedEvent.getUser(), appAddedEvent.getApplicatonPriority(),
appAddedEvent.getPlacementContext());
appAddedEvent.getPlacementContext(),
appAddedEvent.isUnmanagedAM());
} else {
addApplicationOnRecovery(appAddedEvent.getApplicationId(), queueName,
appAddedEvent.getUser(), appAddedEvent.getApplicatonPriority(),
appAddedEvent.getPlacementContext());
appAddedEvent.getPlacementContext(),
appAddedEvent.isUnmanagedAM());
}
}
}

View File

@ -603,7 +603,9 @@ public class LeafQueue extends AbstractCSQueue {
// We don't want to update metrics for move app
if (!isMoveApp) {
metrics.submitAppAttempt(userName);
boolean unmanagedAM = application.getAppSchedulingInfo() != null &&
application.getAppSchedulingInfo().isUnmanagedAM();
metrics.submitAppAttempt(userName, unmanagedAM);
}
getParent().submitApplicationAttempt(application, userName);

View File

@ -34,6 +34,7 @@ public class AppAddedSchedulerEvent extends SchedulerEvent {
private final boolean isAppRecovering;
private final Priority appPriority;
private final ApplicationPlacementContext placementContext;
private boolean unmanagedAM = false;
public AppAddedSchedulerEvent(ApplicationId applicationId, String queue,
String user) {
@ -58,6 +59,7 @@ public class AppAddedSchedulerEvent extends SchedulerEvent {
this(submissionContext.getApplicationId(), submissionContext.getQueue(),
user, isAppRecovering, submissionContext.getReservationID(),
appPriority, null);
this.unmanagedAM = submissionContext.getUnmanagedAM();
}
public AppAddedSchedulerEvent(String user,
@ -66,6 +68,7 @@ public class AppAddedSchedulerEvent extends SchedulerEvent {
this(submissionContext.getApplicationId(), submissionContext.getQueue(),
user, isAppRecovering, submissionContext.getReservationID(),
appPriority, placementContext);
this.unmanagedAM = submissionContext.getUnmanagedAM();
}
public AppAddedSchedulerEvent(ApplicationId applicationId, String queue,
@ -108,4 +111,8 @@ public class AppAddedSchedulerEvent extends SchedulerEvent {
public ApplicationPlacementContext getPlacementContext() {
return placementContext;
}
public boolean isUnmanagedAM() {
return unmanagedAM;
}
}

View File

@ -552,11 +552,15 @@ public class FairScheduler extends
return;
}
}
boolean unmanagedAM = rmApp != null &&
rmApp.getApplicationSubmissionContext() != null
&& rmApp.getApplicationSubmissionContext().getUnmanagedAM();
SchedulerApplication<FSAppAttempt> application =
new SchedulerApplication<>(queue, user);
new SchedulerApplication<>(queue, user, unmanagedAM);
applications.put(applicationId, application);
queue.getMetrics().submitApp(user);
queue.getMetrics().submitApp(user, unmanagedAM);
LOG.info("Accepted application " + applicationId + " from user: " + user
+ ", in queue: " + queueName
@ -610,7 +614,7 @@ public class FairScheduler extends
maxRunningEnforcer.trackNonRunnableApp(attempt);
}
queue.getMetrics().submitAppAttempt(user);
queue.getMetrics().submitAppAttempt(user, application.isUnmanagedAM());
LOG.info("Added Application Attempt " + applicationAttemptId
+ " to scheduler from user: " + user);

View File

@ -389,11 +389,13 @@ public class FifoScheduler extends
@VisibleForTesting
public synchronized void addApplication(ApplicationId applicationId,
String queue, String user, boolean isAppRecovering) {
String queue, String user, boolean isAppRecovering,
boolean unmanagedAM) {
SchedulerApplication<FifoAppAttempt> application =
new SchedulerApplication<>(DEFAULT_QUEUE, user);
new SchedulerApplication<>(DEFAULT_QUEUE, user, unmanagedAM);
applications.put(applicationId, application);
metrics.submitApp(user);
metrics.submitApp(user, unmanagedAM);
LOG.info("Accepted application " + applicationId + " from user: " + user
+ ", currently num of applications: " + applications.size());
if (isAppRecovering) {
@ -424,7 +426,8 @@ public class FifoScheduler extends
}
application.setCurrentAppAttempt(schedulerApp);
metrics.submitAppAttempt(user);
metrics.submitAppAttempt(user, application.isUnmanagedAM());
LOG.info("Added Application Attempt " + appAttemptId
+ " to scheduler from user " + application.getUser());
if (isAttemptRecovering) {
@ -768,8 +771,8 @@ public class FifoScheduler extends
{
AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
addApplication(appAddedEvent.getApplicationId(),
appAddedEvent.getQueue(), appAddedEvent.getUser(),
appAddedEvent.getIsAppRecovering());
appAddedEvent.getQueue(), appAddedEvent.getUser(),
appAddedEvent.getIsAppRecovering(), appAddedEvent.isUnmanagedAM());
}
break;
case APP_REMOVED:

View File

@ -1559,6 +1559,8 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
nm1.registerNode();
QueueMetrics qm1 = rm1.getResourceScheduler().getRootQueueMetrics();
assertUnmanagedAMQueueMetrics(qm1, 0, 0, 0, 0);
// create app and launch the UAM
MockRMAppSubmissionData data =
MockRMAppSubmissionData.Builder.createWithMemory(200, rm1)
@ -1567,6 +1569,7 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
RMApp app0 = MockRMAppSubmitter.submit(rm1, data);
MockAM am0 = MockRM.launchUAM(app0, rm1, nm1);
am0.registerAppAttempt();
assertUnmanagedAMQueueMetrics(qm1, 1, 1, 0, 0);
// Allocate containers to UAM
int numContainers = 2;
@ -1581,17 +1584,19 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
new ArrayList<ContainerId>()).getAllocatedContainers());
Thread.sleep(100);
}
assertUnmanagedAMQueueMetrics(qm1, 1, 0, 1, 0);
// start new RM
rm2 = new MockRM(conf, memStore);
rm2.start();
MockMemoryRMStateStore memStore2 =
(MockMemoryRMStateStore) rm2.getRMStateStore();
QueueMetrics qm2 = rm2.getResourceScheduler().getRootQueueMetrics();
rm2.waitForState(app0.getApplicationId(), RMAppState.ACCEPTED);
rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.LAUNCHED);
// recover app
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
assertUnmanagedAMQueueMetrics(qm2, 1, 1, 0, 0);
RMApp recoveredApp =
rm2.getRMContext().getRMApps().get(app0.getApplicationId());
NMContainerStatus container1 = TestRMRestart
@ -1601,13 +1606,13 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
.createNMContainerStatus(am0.getApplicationAttemptId(), 2,
ContainerState.RUNNING);
nm1.registerNode(Arrays.asList(container1, container2), null);
// Wait for RM to settle down on recovering containers;
waitForNumContainersToRecover(2, rm2, am0.getApplicationAttemptId());
// retry registerApplicationMaster() after RM restart.
am0.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext());
am0.registerAppAttempt(true);
assertUnmanagedAMQueueMetrics(qm2, 1, 0, 1, 0);
// Check if UAM is correctly recovered on restart
rm2.waitForState(app0.getApplicationId(), RMAppState.RUNNING);
@ -1626,6 +1631,7 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
// Check if UAM is able to heart beat
Assert.assertNotNull(am0.doHeartbeat());
assertUnmanagedAMQueueMetrics(qm2, 1, 0, 1, 0);
// Complete the UAM
am0.unregisterAppAttempt(false);
@ -1633,15 +1639,26 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
rm2.waitForState(app0.getApplicationId(), RMAppState.FINISHED);
Assert.assertEquals(FinalApplicationStatus.SUCCEEDED,
recoveredApp.getFinalApplicationStatus());
assertUnmanagedAMQueueMetrics(qm2, 1, 0, 0, 1);
// Restart RM once more to check UAM is not re-run
MockRM rm3 = new MockRM(conf, memStore2);
rm3.start();
recoveredApp = rm3.getRMContext().getRMApps().get(app0.getApplicationId());
QueueMetrics qm3 = rm3.getResourceScheduler().getRootQueueMetrics();
Assert.assertEquals(RMAppState.FINISHED, recoveredApp.getState());
assertUnmanagedAMQueueMetrics(qm2, 1, 0, 0, 1);
}
private void assertUnmanagedAMQueueMetrics(QueueMetrics qm, int appsSubmitted,
int appsPending, int appsRunning, int appsCompleted) {
Assert.assertEquals(appsSubmitted, qm.getUnmanagedAppsSubmitted());
Assert.assertEquals(appsPending, qm.getUnmanagedAppsPending());
Assert.assertEquals(appsRunning, qm.getUnmanagedAppsRunning());
Assert.assertEquals(appsCompleted, qm.getUnmanagedAppsCompleted());
}
@Test(timeout = 30000)
public void testUnknownUserOnRecovery() throws Exception {

View File

@ -33,6 +33,12 @@ import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppMetrics
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppMetricsChecker.AppMetricsKey.APPS_PENDING;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppMetricsChecker.AppMetricsKey.APPS_RUNNING;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppMetricsChecker.AppMetricsKey.APPS_SUBMITTED;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppMetricsChecker.AppMetricsKey.UNMANAGED_APPS_COMPLETED;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppMetricsChecker.AppMetricsKey.UNMANAGED_APPS_FAILED;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppMetricsChecker.AppMetricsKey.UNMANAGED_APPS_KILLED;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppMetricsChecker.AppMetricsKey.UNMANAGED_APPS_PENDING;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppMetricsChecker.AppMetricsKey.UNMANAGED_APPS_RUNNING;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppMetricsChecker.AppMetricsKey.UNMANAGED_APPS_SUBMITTED;
final class AppMetricsChecker {
private final static Logger LOG =
@ -45,7 +51,13 @@ final class AppMetricsChecker {
.gaugeInt(APPS_RUNNING, 0)
.counter(APPS_COMPLETED, 0)
.counter(APPS_FAILED, 0)
.counter(APPS_KILLED, 0);
.counter(APPS_KILLED, 0)
.counter(UNMANAGED_APPS_SUBMITTED, 0)
.gaugeInt(UNMANAGED_APPS_PENDING, 0)
.gaugeInt(UNMANAGED_APPS_RUNNING, 0)
.counter(UNMANAGED_APPS_COMPLETED, 0)
.counter(UNMANAGED_APPS_FAILED, 0)
.counter(UNMANAGED_APPS_KILLED, 0);
enum AppMetricsKey {
APPS_SUBMITTED("AppsSubmitted"),
@ -53,7 +65,13 @@ final class AppMetricsChecker {
APPS_RUNNING("AppsRunning"),
APPS_COMPLETED("AppsCompleted"),
APPS_FAILED("AppsFailed"),
APPS_KILLED("AppsKilled");
APPS_KILLED("AppsKilled"),
UNMANAGED_APPS_SUBMITTED("UnmanagedAppsSubmitted"),
UNMANAGED_APPS_PENDING("UnmanagedAppsPending"),
UNMANAGED_APPS_RUNNING("UnmanagedAppsRunning"),
UNMANAGED_APPS_COMPLETED("UnmanagedAppsCompleted"),
UNMANAGED_APPS_FAILED("UnmanagedAppsFailed"),
UNMANAGED_APPS_KILLED("UnmanagedAppsKilled");
private String value;

View File

@ -405,7 +405,8 @@ public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase {
RMApp mockAPp =
new MockRMApp(125, System.currentTimeMillis(), RMAppState.NEW);
SchedulerApplication<FiCaSchedulerApp> application =
new SchedulerApplication<FiCaSchedulerApp>(null, mockAPp.getUser());
new SchedulerApplication<FiCaSchedulerApp>(null, mockAPp.getUser(),
false);
// Second app with one app attempt
RMApp app = MockRMAppSubmitter.submitWithMemory(200, rm1);

View File

@ -50,7 +50,7 @@ public class TestAppSchedulingInfo {
doReturn(new YarnConfiguration()).when(rmContext).getYarnConfiguration();
AppSchedulingInfo appSchedulingInfo = new AppSchedulingInfo(appAttemptId,
"test", queue, null, 0, new ResourceUsage(),
new HashMap<String, String>(), rmContext);
new HashMap<String, String>(), rmContext, false);
appSchedulingInfo.updatePlacesBlacklistedByApp(new ArrayList<String>(),
new ArrayList<String>());
@ -124,7 +124,7 @@ public class TestAppSchedulingInfo {
doReturn(new YarnConfiguration()).when(rmContext).getYarnConfiguration();
AppSchedulingInfo info = new AppSchedulingInfo(
appAttemptId, "test", queue, mock(ActiveUsersManager.class), 0,
new ResourceUsage(), new HashMap<>(), rmContext);
new ResourceUsage(), new HashMap<>(), rmContext, false);
Assert.assertEquals(0, info.getSchedulerKeys().size());
Priority pri1 = Priority.newInstance(1);

View File

@ -86,8 +86,8 @@ public class TestPartitionQueueMetrics {
QueueMetrics q2 =
QueueMetrics.forQueue(ms, "root.q2", parentQueue, true, CONF);
q1.submitApp(user);
q1.submitAppAttempt(user);
q1.submitApp(user, false);
q1.submitAppAttempt(user, false);
root.setAvailableResourcesToQueue("x",
Resources.createResource(200 * GB, 200));
@ -140,8 +140,8 @@ public class TestPartitionQueueMetrics {
QueueMetrics.forQueue(ms, "root.q2", parentQueue, false, CONF);
AppSchedulingInfo app = mockApp(user);
q1.submitApp(user);
q1.submitAppAttempt(user);
q1.submitApp(user, false);
q1.submitAppAttempt(user, false);
root.setAvailableResourcesToQueue("x",
Resources.createResource(200 * GB, 200));
@ -414,8 +414,8 @@ public class TestPartitionQueueMetrics {
QueueMetrics.forQueue(ms, leafQueueName, parentQueue, true, CONF);
AppSchedulingInfo app = mockApp(user);
metrics.submitApp(user);
metrics.submitAppAttempt(user);
metrics.submitApp(user, false);
metrics.submitAppAttempt(user, false);
parentMetrics.setAvailableResourcesToQueue(partition,
Resources.createResource(100 * GB, 100));
@ -447,7 +447,7 @@ public class TestPartitionQueueMetrics {
checkResources(partitionSource, 0, 0, 0, 0, 0, 100 * GB, 100, 18 * GB, 18,
6, 0, 0, 0);
metrics.runAppAttempt(app.getApplicationId(), user);
metrics.runAppAttempt(app.getApplicationId(), user, false);
metrics.allocateResources(partition, user, 3,
Resources.createResource(1 * GB, 1), true);
@ -491,9 +491,9 @@ public class TestPartitionQueueMetrics {
0, 0, 0);
metrics.finishAppAttempt(app.getApplicationId(), app.isPending(),
app.getUser());
app.getUser(), false);
metrics.finishApp(user, RMAppState.FINISHED);
metrics.finishApp(user, RMAppState.FINISHED, false);
}
@Test
@ -519,8 +519,8 @@ public class TestPartitionQueueMetrics {
QueueMetrics.forQueue(leafQueueName1, leafQueue, true, CONF);
AppSchedulingInfo app = mockApp(user);
metrics1.submitApp(user);
metrics1.submitAppAttempt(user);
metrics1.submitApp(user, false);
metrics1.submitAppAttempt(user, false);
parentMetrics.setAvailableResourcesToQueue(partitionX,
Resources.createResource(200 * GB, 200));
@ -615,9 +615,9 @@ public class TestPartitionQueueMetrics {
0, 0);
metrics1.finishAppAttempt(app.getApplicationId(), app.isPending(),
app.getUser());
app.getUser(), false);
metrics1.finishApp(user, RMAppState.FINISHED);
metrics1.finishApp(user, RMAppState.FINISHED, false);
}
/**
@ -650,8 +650,8 @@ public class TestPartitionQueueMetrics {
AppSchedulingInfo app = mockApp(user);
q1.submitApp(user);
q1.submitAppAttempt(user);
q1.submitApp(user, false);
q1.submitAppAttempt(user, false);
root.setAvailableResourcesToQueue("x",
Resources.createResource(200 * GB, 200));
@ -680,8 +680,9 @@ public class TestPartitionQueueMetrics {
checkResources(q2Source, 0, 0, 0, 0, 0, 3 * GB, 3, 3);
checkResources(q2UserSource, 0, 0, 0, 0, 0, 3 * GB, 3, 3);
q1.finishAppAttempt(app.getApplicationId(), app.isPending(), app.getUser());
q1.finishApp(user, RMAppState.FINISHED);
q1.finishAppAttempt(app.getApplicationId(), app.isPending(), app.getUser(),
false);
q1.finishApp(user, RMAppState.FINISHED, false);
}
public static MetricsSource partitionSource(MetricsSystem ms,

View File

@ -44,6 +44,10 @@ import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppMetrics
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppMetricsChecker.AppMetricsKey.APPS_PENDING;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppMetricsChecker.AppMetricsKey.APPS_RUNNING;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppMetricsChecker.AppMetricsKey.APPS_SUBMITTED;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppMetricsChecker.AppMetricsKey.UNMANAGED_APPS_FAILED;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppMetricsChecker.AppMetricsKey.UNMANAGED_APPS_PENDING;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppMetricsChecker.AppMetricsKey.UNMANAGED_APPS_RUNNING;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppMetricsChecker.AppMetricsKey.UNMANAGED_APPS_SUBMITTED;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_CONTAINERS_ALLOCATED;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_CONTAINERS_RELEASED;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_CONTAINERS;
@ -89,12 +93,12 @@ public class TestQueueMetrics {
MetricsSource queueSource= queueSource(ms, queueName);
AppSchedulingInfo app = mockApp(USER);
metrics.submitApp(USER);
metrics.submitApp(USER, false);
MetricsSource userSource = userSource(ms, queueName, USER);
AppMetricsChecker appMetricsChecker = AppMetricsChecker.create()
.counter(APPS_SUBMITTED, 1)
.checkAgainst(queueSource, true);
metrics.submitAppAttempt(USER);
metrics.submitAppAttempt(USER, false);
appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
.gaugeInt(APPS_PENDING, 1)
.checkAgainst(queueSource, true);
@ -111,7 +115,7 @@ public class TestQueueMetrics {
.gaugeLong(PENDING_MB, 15 * GB).gaugeInt(PENDING_V_CORES, 15)
.gaugeInt(PENDING_CONTAINERS, 5).checkAgainst(queueSource);
metrics.runAppAttempt(app.getApplicationId(), USER);
metrics.runAppAttempt(app.getApplicationId(), USER, false);
appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
.gaugeInt(APPS_PENDING, 0)
.gaugeInt(APPS_RUNNING, 1)
@ -151,12 +155,12 @@ public class TestQueueMetrics {
.checkAgainst(queueSource);
metrics.finishAppAttempt(
app.getApplicationId(), app.isPending(), app.getUser());
app.getApplicationId(), app.isPending(), app.getUser(), false);
appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
.counter(APPS_SUBMITTED, 1)
.gaugeInt(APPS_RUNNING, 0)
.checkAgainst(queueSource, true);
metrics.finishApp(USER, RMAppState.FINISHED);
metrics.finishApp(USER, RMAppState.FINISHED, false);
AppMetricsChecker.createFromChecker(appMetricsChecker)
.counter(APPS_COMPLETED, 1)
.checkAgainst(queueSource, true);
@ -172,36 +176,36 @@ public class TestQueueMetrics {
MetricsSource queueSource = queueSource(ms, queueName);
AppSchedulingInfo app = mockApp(USER);
metrics.submitApp(USER);
metrics.submitApp(USER, false);
MetricsSource userSource = userSource(ms, queueName, USER);
AppMetricsChecker appMetricsChecker = AppMetricsChecker.create()
.counter(APPS_SUBMITTED, 1)
.checkAgainst(queueSource, true);
metrics.submitAppAttempt(USER);
metrics.submitAppAttempt(USER, false);
appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
.gaugeInt(APPS_PENDING, 1)
.checkAgainst(queueSource, true);
metrics.runAppAttempt(app.getApplicationId(), USER);
metrics.runAppAttempt(app.getApplicationId(), USER, false);
appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
.gaugeInt(APPS_PENDING, 0)
.gaugeInt(APPS_RUNNING, 1)
.checkAgainst(queueSource, true);
metrics.finishAppAttempt(
app.getApplicationId(), app.isPending(), app.getUser());
app.getApplicationId(), app.isPending(), app.getUser(), false);
appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
.gaugeInt(APPS_RUNNING, 0)
.checkAgainst(queueSource, true);
// As the application has failed, framework retries the same application
// based on configuration
metrics.submitAppAttempt(USER);
metrics.submitAppAttempt(USER, false);
appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
.gaugeInt(APPS_PENDING, 1)
.checkAgainst(queueSource, true);
metrics.runAppAttempt(app.getApplicationId(), USER);
metrics.runAppAttempt(app.getApplicationId(), USER, false);
appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
.gaugeInt(APPS_PENDING, 0)
.gaugeInt(APPS_RUNNING, 1)
@ -209,19 +213,19 @@ public class TestQueueMetrics {
// Suppose say application has failed this time as well.
metrics.finishAppAttempt(
app.getApplicationId(), app.isPending(), app.getUser());
app.getApplicationId(), app.isPending(), app.getUser(), false);
appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
.gaugeInt(APPS_RUNNING, 0)
.checkAgainst(queueSource, true);
// As the application has failed, framework retries the same application
// based on configuration
metrics.submitAppAttempt(USER);
metrics.submitAppAttempt(USER, false);
appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
.gaugeInt(APPS_PENDING, 1)
.checkAgainst(queueSource, true);
metrics.runAppAttempt(app.getApplicationId(), USER);
metrics.runAppAttempt(app.getApplicationId(), USER, false);
appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
.gaugeInt(APPS_PENDING, 0)
.gaugeInt(APPS_RUNNING, 1)
@ -229,12 +233,12 @@ public class TestQueueMetrics {
// Suppose say application has failed, and there's no more retries.
metrics.finishAppAttempt(
app.getApplicationId(), app.isPending(), app.getUser());
app.getApplicationId(), app.isPending(), app.getUser(), false);
appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
.gaugeInt(APPS_RUNNING, 0)
.checkAgainst(queueSource, true);
metrics.finishApp(USER, RMAppState.FAILED);
metrics.finishApp(USER, RMAppState.FAILED, false);
AppMetricsChecker.createFromChecker(appMetricsChecker)
.gaugeInt(APPS_RUNNING, 0)
.counter(APPS_FAILED, 1)
@ -243,6 +247,87 @@ public class TestQueueMetrics {
assertNull(userSource);
}
@Test
public void testQueueUnmanagedAppMetricsForMultipleFailures() {
String queueName = "single";
QueueMetrics metrics = QueueMetrics.forQueue(ms, queueName, null, false,
new Configuration());
MetricsSource queueSource = queueSource(ms, queueName);
AppSchedulingInfo app = mockApp(USER);
// Submit an unmanaged Application.
metrics.submitApp(USER, true);
MetricsSource userSource = userSource(ms, queueName, USER);
AppMetricsChecker appMetricsChecker = AppMetricsChecker.create()
.counter(UNMANAGED_APPS_SUBMITTED, 1).counter(APPS_SUBMITTED, 1)
.checkAgainst(queueSource, true);
metrics.submitAppAttempt(USER, true);
appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
.gaugeInt(UNMANAGED_APPS_PENDING, 1).gaugeInt(APPS_PENDING, 1)
.checkAgainst(queueSource, true);
metrics.runAppAttempt(app.getApplicationId(), USER, true);
appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
.gaugeInt(UNMANAGED_APPS_PENDING, 0).gaugeInt(APPS_PENDING, 0)
.gaugeInt(UNMANAGED_APPS_RUNNING, 1).gaugeInt(APPS_RUNNING, 1)
.checkAgainst(queueSource, true);
metrics.finishAppAttempt(
app.getApplicationId(), app.isPending(), app.getUser(), true);
appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
.gaugeInt(UNMANAGED_APPS_RUNNING, 0).gaugeInt(APPS_RUNNING, 0)
.checkAgainst(queueSource, true);
// As the application has failed, framework retries the same application
// based on configuration
metrics.submitAppAttempt(USER, true);
appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
.gaugeInt(UNMANAGED_APPS_PENDING, 1).gaugeInt(APPS_PENDING, 1)
.checkAgainst(queueSource, true);
metrics.runAppAttempt(app.getApplicationId(), USER, true);
appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
.gaugeInt(UNMANAGED_APPS_PENDING, 0).gaugeInt(APPS_PENDING, 0)
.gaugeInt(UNMANAGED_APPS_RUNNING, 1).gaugeInt(APPS_RUNNING, 1)
.checkAgainst(queueSource, true);
// Suppose say application has failed this time as well.
metrics.finishAppAttempt(
app.getApplicationId(), app.isPending(), app.getUser(), true);
appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
.gaugeInt(UNMANAGED_APPS_RUNNING, 0).gaugeInt(APPS_RUNNING, 0)
.checkAgainst(queueSource, true);
// As the application has failed, framework retries the same application
// based on configuration
metrics.submitAppAttempt(USER, true);
appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
.gaugeInt(UNMANAGED_APPS_PENDING, 1).gaugeInt(APPS_PENDING, 1)
.checkAgainst(queueSource, true);
metrics.runAppAttempt(app.getApplicationId(), USER, true);
appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
.gaugeInt(UNMANAGED_APPS_PENDING, 0).gaugeInt(APPS_PENDING, 0)
.gaugeInt(UNMANAGED_APPS_RUNNING, 1).gaugeInt(APPS_RUNNING, 1)
.checkAgainst(queueSource, true);
// Suppose say application has failed, and there's no more retries.
metrics.finishAppAttempt(
app.getApplicationId(), app.isPending(), app.getUser(), true);
appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
.gaugeInt(UNMANAGED_APPS_RUNNING, 0).gaugeInt(APPS_RUNNING, 0)
.checkAgainst(queueSource, true);
metrics.finishApp(USER, RMAppState.FAILED, true);
AppMetricsChecker.createFromChecker(appMetricsChecker)
.gaugeInt(UNMANAGED_APPS_RUNNING, 0).gaugeInt(APPS_RUNNING, 0)
.counter(UNMANAGED_APPS_FAILED, 1).counter(APPS_FAILED, 1)
.checkAgainst(queueSource, true);
assertNull(userSource);
}
@Test
public void testSingleQueueWithUserMetrics() {
String queueName = "single2";
@ -252,7 +337,7 @@ public class TestQueueMetrics {
MetricsSource queueSource = queueSource(ms, queueName);
AppSchedulingInfo app = mockApp(USER_2);
metrics.submitApp(USER_2);
metrics.submitApp(USER_2, false);
MetricsSource userSource = userSource(ms, queueName, USER_2);
AppMetricsChecker appMetricsQueueSourceChecker = AppMetricsChecker.create()
@ -262,7 +347,7 @@ public class TestQueueMetrics {
.counter(APPS_SUBMITTED, 1)
.checkAgainst(userSource, true);
metrics.submitAppAttempt(USER_2);
metrics.submitAppAttempt(USER_2, false);
appMetricsQueueSourceChecker = AppMetricsChecker
.createFromChecker(appMetricsQueueSourceChecker)
.gaugeInt(APPS_PENDING, 1)
@ -298,7 +383,7 @@ public class TestQueueMetrics {
.gaugeInt(PENDING_CONTAINERS, 5)
.checkAgainst(userSource);
metrics.runAppAttempt(app.getApplicationId(), USER_2);
metrics.runAppAttempt(app.getApplicationId(), USER_2, false);
appMetricsQueueSourceChecker = AppMetricsChecker
.createFromChecker(appMetricsQueueSourceChecker)
.gaugeInt(APPS_PENDING, 0)
@ -349,7 +434,7 @@ public class TestQueueMetrics {
.checkAgainst(userSource);
metrics.finishAppAttempt(
app.getApplicationId(), app.isPending(), app.getUser());
app.getApplicationId(), app.isPending(), app.getUser(), false);
appMetricsQueueSourceChecker =
AppMetricsChecker.createFromChecker(appMetricsQueueSourceChecker)
.gaugeInt(APPS_RUNNING, 0)
@ -358,7 +443,7 @@ public class TestQueueMetrics {
AppMetricsChecker.createFromChecker(appMetricsUserSourceChecker)
.gaugeInt(APPS_RUNNING, 0)
.checkAgainst(userSource, true);
metrics.finishApp(USER_2, RMAppState.FINISHED);
metrics.finishApp(USER_2, RMAppState.FINISHED, false);
AppMetricsChecker.createFromChecker(appMetricsQueueSourceChecker)
.counter(APPS_COMPLETED, 1)
.checkAgainst(queueSource, true);
@ -382,7 +467,7 @@ public class TestQueueMetrics {
MetricsSource queueSource = queueSource(ms, leafQueueName);
//AppSchedulingInfo app = mockApp(user);
metrics.submitApp(USER);
metrics.submitApp(USER, false);
MetricsSource userSource = userSource(ms, leafQueueName, USER);
MetricsSource parentUserSource = userSource(ms, parentQueueName, USER);
@ -417,7 +502,7 @@ public class TestQueueMetrics {
QueueInfo root = new QueueInfo(null, "root", ms, conf, USER);
QueueInfo leaf = new QueueInfo(root, "root.leaf", ms, conf, USER);
leaf.queueMetrics.submitApp(USER);
leaf.queueMetrics.submitApp(USER, false);
AppMetricsChecker appMetricsQueueSourceChecker = AppMetricsChecker.create()
.counter(APPS_SUBMITTED, 1)
@ -434,7 +519,7 @@ public class TestQueueMetrics {
.counter(APPS_SUBMITTED, 1)
.checkAgainst(root.userSource, true);
leaf.queueMetrics.submitAppAttempt(USER);
leaf.queueMetrics.submitAppAttempt(USER, false);
appMetricsQueueSourceChecker =
AppMetricsChecker.createFromChecker(appMetricsQueueSourceChecker)
.gaugeInt(APPS_PENDING, 1)
@ -489,7 +574,7 @@ public class TestQueueMetrics {
.gaugeLong(PENDING_MB, 15 * GB).gaugeInt(PENDING_V_CORES, 15)
.gaugeInt(PENDING_CONTAINERS, 5).checkAgainst(root.userSource);
leaf.queueMetrics.runAppAttempt(app.getApplicationId(), USER);
leaf.queueMetrics.runAppAttempt(app.getApplicationId(), USER, false);
appMetricsQueueSourceChecker =
AppMetricsChecker.createFromChecker(appMetricsQueueSourceChecker)
.gaugeInt(APPS_PENDING, 0)
@ -603,7 +688,7 @@ public class TestQueueMetrics {
.checkAgainst(root.userSource);
leaf.queueMetrics.finishAppAttempt(
app.getApplicationId(), app.isPending(), app.getUser());
app.getApplicationId(), app.isPending(), app.getUser(), false);
appMetricsQueueSourceChecker = AppMetricsChecker
.createFromChecker(appMetricsQueueSourceChecker)
.counter(APPS_SUBMITTED, 1)
@ -627,7 +712,7 @@ public class TestQueueMetrics {
.gaugeInt(APPS_RUNNING, 0)
.checkAgainst(root.userSource, true);
leaf.queueMetrics.finishApp(USER, RMAppState.FINISHED);
leaf.queueMetrics.finishApp(USER, RMAppState.FINISHED, false);
AppMetricsChecker.createFromChecker(appMetricsQueueSourceChecker)
.counter(APPS_COMPLETED, 1)
.checkAgainst(leaf.queueSource, true);

View File

@ -126,7 +126,8 @@ public class TestSchedulerApplicationAttempt {
when(rmContext.getYarnConfiguration()).thenReturn(conf);
SchedulerApplicationAttempt app = new SchedulerApplicationAttempt(appAttId,
user, oldQueue, oldQueue.getAbstractUsersManager(), rmContext);
oldMetrics.submitApp(user);
app.appSchedulingInfo.setUnmanagedAM(false);
oldMetrics.submitApp(user, false);
// confirm that containerId is calculated based on epoch.
assertEquals(0x30000000001L, app.getNewContainerId());

View File

@ -60,6 +60,7 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
@ -79,6 +80,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
@ -131,6 +133,7 @@ public class TestLeafQueue {
CapacityScheduler cs;
CapacitySchedulerConfiguration csConf;
CapacitySchedulerContext csContext;
private RMApp rmApp;
CSQueue root;
private CSQueueStore queues;
@ -174,7 +177,7 @@ public class TestLeafQueue {
ConcurrentMap<ApplicationId, RMApp> spyApps =
spy(new ConcurrentHashMap<ApplicationId, RMApp>());
RMApp rmApp = mock(RMApp.class);
rmApp = mock(RMApp.class);
when(rmApp.getRMAppAttempt(any())).thenReturn(null);
amResourceRequest = mock(ResourceRequest.class);
when(amResourceRequest.getCapability()).thenReturn(
@ -466,7 +469,12 @@ public class TestLeafQueue {
public void testAppAttemptMetrics() throws Exception {
CSMaxRunningAppsEnforcer enforcer = mock(CSMaxRunningAppsEnforcer.class);
cs.setMaxRunningAppsEnforcer(enforcer);
ApplicationSubmissionContext applicationSubmissionContext =
mock(ApplicationSubmissionContext.class);
when(applicationSubmissionContext.getUnmanagedAM()).thenReturn(false);
when(rmApp.getApplicationSubmissionContext())
.thenReturn(applicationSubmissionContext);
when(rmApp.getCurrentAppAttempt()).thenReturn(mock(RMAppAttempt.class));
// Manipulate queue 'a'
LeafQueue a = stubLeafQueue((LeafQueue) queues.get(B));
@ -495,17 +503,18 @@ public class TestLeafQueue {
// Attempt the same application again
final ApplicationAttemptId appAttemptId_1 = TestUtils
.getMockApplicationAttemptId(0, 2);
FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a, null,
spyRMContext);
app_1.setAMResource(Resource.newInstance(100, 1));
a.submitApplicationAttempt(app_1, user_0); // same user
FiCaSchedulerApp app1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a,
null, spyRMContext);
app1.getAppSchedulingInfo().setUnmanagedAM(false);
app1.setAMResource(Resource.newInstance(100, 1));
a.submitApplicationAttempt(app1, user_0); // same user
assertEquals(1, a.getMetrics().getAppsSubmitted());
assertEquals(1, a.getMetrics().getAppsPending());
assertEquals(1, a.getUser(user_0).getActiveApplications());
assertEquals(app_1.getAMResource().getMemorySize(), a.getMetrics()
assertEquals(app1.getAMResource().getMemorySize(), a.getMetrics()
.getUsedAMResourceMB());
assertEquals(app_1.getAMResource().getVirtualCores(), a.getMetrics()
assertEquals(app1.getAMResource().getVirtualCores(), a.getMetrics()
.getUsedAMResourceVCores());
event = new AppAttemptRemovedSchedulerEvent(appAttemptId_0,
@ -524,6 +533,74 @@ public class TestLeafQueue {
assertEquals(1, userMetrics.getAppsSubmitted());
}
@Test
public void testUnmanagedAppAttemptMetrics() throws Exception {
CSMaxRunningAppsEnforcer enforcer = mock(CSMaxRunningAppsEnforcer.class);
cs.setMaxRunningAppsEnforcer(enforcer);
// Manipulate queue 'a'
LeafQueue a = stubLeafQueue((LeafQueue) queues.get(B));
// Users
final String user0 = "user_0";
// Submit applications
final ApplicationAttemptId appAttemptId0 = TestUtils
.getMockApplicationAttemptId(0, 1);
ApplicationSubmissionContext applicationSubmissionContext =
ApplicationSubmissionContext.newInstance(
appAttemptId0.getApplicationId(), "test", a.getQueuePath(),
Priority.newInstance(0), null, true, true,
2, null, "test");
AppAddedSchedulerEvent addAppEvent =
new AppAddedSchedulerEvent(user0, applicationSubmissionContext, false,
null);
cs.handle(addAppEvent);
AppAttemptAddedSchedulerEvent addAttemptEvent =
new AppAttemptAddedSchedulerEvent(appAttemptId0, false);
cs.handle(addAttemptEvent);
AppAttemptRemovedSchedulerEvent event = new AppAttemptRemovedSchedulerEvent(
appAttemptId0, RMAppAttemptState.FAILED, false);
cs.handle(event);
assertEquals(0, a.getMetrics().getUnmanagedAppsPending());
assertEquals(0, a.getMetrics().getUnmanagedAppsFailed());
// Attempt the same application again
final ApplicationAttemptId appAttemptId1 = TestUtils
.getMockApplicationAttemptId(0, 2);
FiCaSchedulerApp app1 = new FiCaSchedulerApp(appAttemptId1, user0, a,
null, spyRMContext);
app1.setAMResource(Resource.newInstance(100, 1));
a.submitApplicationAttempt(app1, user0); // same user
assertEquals(1, a.getMetrics().getUnmanagedAppsSubmitted());
assertEquals(1, a.getMetrics().getUnmanagedAppsPending());
assertEquals(1, a.getUser(user0).getActiveApplications());
assertEquals(app1.getAMResource().getMemorySize(), a.getMetrics()
.getUsedAMResourceMB());
assertEquals(app1.getAMResource().getVirtualCores(), a.getMetrics()
.getUsedAMResourceVCores());
event = new AppAttemptRemovedSchedulerEvent(appAttemptId0,
RMAppAttemptState.FINISHED, false);
cs.handle(event);
AppRemovedSchedulerEvent rEvent = new AppRemovedSchedulerEvent(
appAttemptId0.getApplicationId(), RMAppState.FINISHED);
cs.handle(rEvent);
assertEquals(1, a.getMetrics().getUnmanagedAppsSubmitted());
assertEquals(0, a.getMetrics().getUnmanagedAppsPending());
assertEquals(0, a.getMetrics().getUnmanagedAppsFailed());
assertEquals(1, a.getMetrics().getUnmanagedAppsCompleted());
QueueMetrics userMetrics = a.getMetrics().getUserMetrics(user0);
assertEquals(1, userMetrics.getUnmanagedAppsSubmitted());
}
@Test
public void testFairConfiguration() throws Exception {

View File

@ -796,7 +796,7 @@ public class TestFifoScheduler {
scheduler.handle(new NodeAddedSchedulerEvent(node));
ApplicationId appId = ApplicationId.newInstance(0, 1);
scheduler.addApplication(appId, "queue1", "user1", false);
scheduler.addApplication(appId, "queue1", "user1", false, false);
NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node);
try {