YARN-6546. SLS is slow while loading 10k queues. (Yufei Gu via Haibo Chen)
This commit is contained in:
parent
f27a4ad032
commit
46eb1033a8
@ -731,6 +731,7 @@ private void increaseQueueAppNum(String queue) throws YarnException {
|
||||
}
|
||||
|
||||
queueAppNumMap.put(queueName, appNum);
|
||||
wrapper.getSchedulerMetrics().trackQueue(queueName);
|
||||
}
|
||||
|
||||
private void runNewAM(String jobType, String user,
|
||||
|
@ -28,9 +28,4 @@ public class CapacitySchedulerMetrics extends SchedulerMetrics {
|
||||
public CapacitySchedulerMetrics() {
|
||||
super();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void trackQueue(String queueName) {
|
||||
trackedQueues.add(queueName);
|
||||
}
|
||||
}
|
||||
|
@ -167,8 +167,9 @@ public Integer getValue() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void trackQueue(String queueName) {
|
||||
trackedQueues.add(queueName);
|
||||
protected void registerQueueMetrics(String queueName) {
|
||||
super.registerQueueMetrics(queueName);
|
||||
|
||||
FairScheduler fair = (FairScheduler) scheduler;
|
||||
final FSQueue queue = fair.getQueueManager().getQueue(queueName);
|
||||
registerQueueMetrics(queue, Metric.DEMAND);
|
||||
@ -209,16 +210,4 @@ public Integer getValue() {
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void untrackQueue(String queueName) {
|
||||
trackedQueues.remove(queueName);
|
||||
|
||||
for (Metric metric: Metric.values()) {
|
||||
metrics.remove("variable.queue." + queueName + "." +
|
||||
metric.value + ".memory");
|
||||
metrics.remove("variable.queue." + queueName + "." +
|
||||
metric.value + ".vcores");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -35,8 +35,9 @@ public FifoSchedulerMetrics() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void trackQueue(String queueName) {
|
||||
trackedQueues.add(queueName);
|
||||
protected void registerQueueMetrics(String queueName) {
|
||||
super.registerQueueMetrics(queueName);
|
||||
|
||||
FifoScheduler fifo = (FifoScheduler) scheduler;
|
||||
// for FifoScheduler, only DEFAULT_QUEUE
|
||||
// here the three parameters doesn't affect results
|
||||
|
@ -323,25 +323,6 @@ private void updateQueueWithAllocateRequest(Allocation allocation,
|
||||
queueName);
|
||||
}
|
||||
|
||||
private void initQueueMetrics(CSQueue queue) {
|
||||
if (queue instanceof LeafQueue) {
|
||||
schedulerMetrics.initQueueMetric(queue.getQueueName());
|
||||
return;
|
||||
}
|
||||
|
||||
for (CSQueue child : queue.getChildQueues()) {
|
||||
initQueueMetrics(child);
|
||||
}
|
||||
}
|
||||
@Override
|
||||
public void serviceInit(Configuration configuration) throws Exception {
|
||||
super.serviceInit(configuration);
|
||||
|
||||
if (metricsON) {
|
||||
initQueueMetrics(getRootQueue());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void serviceStop() throws Exception {
|
||||
try {
|
||||
|
@ -306,25 +306,6 @@ private void updateQueueWithAllocateRequest(Allocation allocation,
|
||||
queueName);
|
||||
}
|
||||
|
||||
private void initQueueMetrics(FSQueue queue) {
|
||||
if (queue instanceof FSLeafQueue) {
|
||||
schedulerMetrics.initQueueMetric(queue.getQueueName());
|
||||
return;
|
||||
}
|
||||
|
||||
for (FSQueue child : queue.getChildQueues()) {
|
||||
initQueueMetrics(child);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void serviceInit(Configuration conf) throws Exception {
|
||||
super.serviceInit(conf);
|
||||
if (metricsON) {
|
||||
initQueueMetrics(getQueueManager().getRootQueue());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void serviceStop() throws Exception {
|
||||
try {
|
||||
|
@ -234,14 +234,35 @@ public void untrackApp(String oldAppId) {
|
||||
}
|
||||
}
|
||||
|
||||
public abstract void trackQueue(String queueName);
|
||||
|
||||
public void untrackQueue(String queueName) {
|
||||
for (String m : queueTrackedMetrics) {
|
||||
metrics.remove("variable.queue." + queueName + "." + m);
|
||||
/**
|
||||
* Track a queue by registering its metrics.
|
||||
*
|
||||
* @param queue queue name
|
||||
*/
|
||||
public void trackQueue(String queue) {
|
||||
queueLock.lock();
|
||||
try {
|
||||
if (!isTracked(queue)) {
|
||||
trackedQueues.add(queue);
|
||||
registerQueueMetrics(queue);
|
||||
}
|
||||
} finally {
|
||||
queueLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
protected void registerQueueMetrics(String queueName) {
|
||||
SortedMap<String, Counter> counterMap = metrics.getCounters();
|
||||
|
||||
for (QueueMetric queueMetric : QueueMetric.values()) {
|
||||
String metricName = getQueueMetricName(queueName, queueMetric);
|
||||
if (!counterMap.containsKey(metricName)) {
|
||||
metrics.counter(metricName);
|
||||
queueTrackedMetrics.add(metricName);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isTracked(String queueName) {
|
||||
return trackedQueues.contains(queueName);
|
||||
}
|
||||
@ -547,40 +568,13 @@ private String getQueueMetricName(String queue, QueueMetric metric) {
|
||||
return "counter.queue." + queue + "." + metric.value;
|
||||
}
|
||||
|
||||
private void traceQueueIfNotTraced(String queue) {
|
||||
queueLock.lock();
|
||||
try {
|
||||
if (!isTracked(queue)) {
|
||||
trackQueue(queue);
|
||||
}
|
||||
} finally {
|
||||
queueLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
void initQueueMetric(String queueName){
|
||||
SortedMap<String, Counter> counterMap = metrics.getCounters();
|
||||
|
||||
for (QueueMetric queueMetric : QueueMetric.values()) {
|
||||
String metricName = getQueueMetricName(queueName, queueMetric);
|
||||
if (!counterMap.containsKey(metricName)) {
|
||||
metrics.counter(metricName);
|
||||
counterMap = metrics.getCounters();
|
||||
}
|
||||
}
|
||||
|
||||
traceQueueIfNotTraced(queueName);
|
||||
}
|
||||
|
||||
void updateQueueMetrics(Resource pendingResource, Resource allocatedResource,
|
||||
String queueName) {
|
||||
trackQueue(queueName);
|
||||
|
||||
SortedMap<String, Counter> counterMap = metrics.getCounters();
|
||||
for(QueueMetric metric : QueueMetric.values()) {
|
||||
String metricName = getQueueMetricName(queueName, metric);
|
||||
if (!counterMap.containsKey(metricName)) {
|
||||
metrics.counter(metricName);
|
||||
counterMap = metrics.getCounters();
|
||||
}
|
||||
|
||||
if (metric == QueueMetric.PENDING_MEMORY) {
|
||||
counterMap.get(metricName).inc(pendingResource.getMemorySize());
|
||||
@ -592,8 +586,6 @@ void updateQueueMetrics(Resource pendingResource, Resource allocatedResource,
|
||||
counterMap.get(metricName).inc(allocatedResource.getVirtualCores());
|
||||
}
|
||||
}
|
||||
|
||||
traceQueueIfNotTraced(queueName);
|
||||
}
|
||||
|
||||
void updateQueueMetricsByRelease(Resource releaseResource, String queue) {
|
||||
|
Loading…
Reference in New Issue
Block a user