MAPREDUCE-3328. mapred queue -list output inconsistent and missing child queues. (Ravi Prakash via mahadev)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1213504 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
14e6f1e796
commit
9b75b05163
@ -267,6 +267,9 @@ Release 0.23.1 - Unreleased
|
||||
MAPREDUCE-3527. Fix minor API incompatibilities between 1.0 and 0.23.
|
||||
(tomwhite)
|
||||
|
||||
MAPREDUCE-3328. mapred queue -list output inconsistent and missing child
|
||||
queues. (Ravi Prakash via mahadev)
|
||||
|
||||
Release 0.23.0 - 2011-11-01
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -450,9 +450,19 @@ public static JobStatus[] fromYarnApps(List<ApplicationReport> applications,
|
||||
|
||||
public static QueueInfo fromYarn(org.apache.hadoop.yarn.api.records.QueueInfo
|
||||
queueInfo, Configuration conf) {
|
||||
return new QueueInfo(queueInfo.getQueueName(),queueInfo.toString(),
|
||||
fromYarn(queueInfo.getQueueState()), TypeConverter.fromYarnApps(
|
||||
queueInfo.getApplications(), conf));
|
||||
QueueInfo toReturn = new QueueInfo(queueInfo.getQueueName(), "Capacity: " +
|
||||
queueInfo.getCapacity() * 100 + ", MaximumCapacity: " +
|
||||
(queueInfo.getMaximumCapacity() < 0 ? "UNDEFINED" :
|
||||
queueInfo.getMaximumCapacity()) + ", CurrentCapacity: " +
|
||||
queueInfo.getCurrentCapacity() * 100, fromYarn(queueInfo.getQueueState()),
|
||||
TypeConverter.fromYarnApps(queueInfo.getApplications(), conf));
|
||||
List<QueueInfo> childQueues = new ArrayList<QueueInfo>();
|
||||
for(org.apache.hadoop.yarn.api.records.QueueInfo childQueue :
|
||||
queueInfo.getChildQueues()) {
|
||||
childQueues.add(fromYarn(childQueue, conf));
|
||||
}
|
||||
toReturn.setQueueChildren(childQueues);
|
||||
return toReturn;
|
||||
}
|
||||
|
||||
public static QueueInfo[] fromYarnQueueInfo(
|
||||
|
@ -17,6 +17,9 @@
|
||||
*/
|
||||
package org.apache.hadoop.mapreduce;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import junit.framework.Assert;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
@ -36,6 +39,7 @@
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
public class TestTypeConverter {
|
||||
@Test
|
||||
@ -134,4 +138,33 @@ public void testFromYarnQueueInfo() {
|
||||
Assert.assertEquals("queueInfo translation didn't work.",
|
||||
returned.getState().toString(), queueInfo.getQueueState().toString().toLowerCase());
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that child queues are converted too during conversion of the parent
|
||||
* queue
|
||||
*/
|
||||
@Test
|
||||
public void testFromYarnQueue() {
|
||||
//Define child queue
|
||||
org.apache.hadoop.yarn.api.records.QueueInfo child =
|
||||
Mockito.mock(org.apache.hadoop.yarn.api.records.QueueInfo.class);
|
||||
Mockito.when(child.getQueueState()).thenReturn(QueueState.RUNNING);
|
||||
|
||||
//Define parent queue
|
||||
org.apache.hadoop.yarn.api.records.QueueInfo queueInfo =
|
||||
Mockito.mock(org.apache.hadoop.yarn.api.records.QueueInfo.class);
|
||||
List<org.apache.hadoop.yarn.api.records.QueueInfo> children =
|
||||
new ArrayList<org.apache.hadoop.yarn.api.records.QueueInfo>();
|
||||
children.add(child); //Add one child
|
||||
Mockito.when(queueInfo.getChildQueues()).thenReturn(children);
|
||||
Mockito.when(queueInfo.getQueueState()).thenReturn(QueueState.RUNNING);
|
||||
|
||||
//Call the function we're testing
|
||||
org.apache.hadoop.mapreduce.QueueInfo returned =
|
||||
TypeConverter.fromYarn(queueInfo, new Configuration());
|
||||
|
||||
//Verify that the converted queue has the 1 child we had added
|
||||
Assert.assertEquals("QueueInfo children weren't properly converted",
|
||||
returned.getQueueChildren().size(), 1);
|
||||
}
|
||||
}
|
||||
|
@ -110,40 +110,33 @@ public int run(String[] argv) throws Exception {
|
||||
}
|
||||
|
||||
// format and print information about the passed in job queue.
|
||||
void printJobQueueInfo(JobQueueInfo jobQueueInfo, Writer writer)
|
||||
throws IOException {
|
||||
void printJobQueueInfo(JobQueueInfo jobQueueInfo, Writer writer,
|
||||
String prefix) throws IOException {
|
||||
if (jobQueueInfo == null) {
|
||||
writer.write("No queue found.\n");
|
||||
writer.flush();
|
||||
return;
|
||||
}
|
||||
writer.write(String.format("Queue Name : %s \n",
|
||||
writer.write(String.format(prefix + "======================\n"));
|
||||
writer.write(String.format(prefix + "Queue Name : %s \n",
|
||||
jobQueueInfo.getQueueName()));
|
||||
writer.write(String.format("Queue State : %s \n",
|
||||
writer.write(String.format(prefix + "Queue State : %s \n",
|
||||
jobQueueInfo.getQueueState()));
|
||||
writer.write(String.format("Scheduling Info : %s \n",
|
||||
writer.write(String.format(prefix + "Scheduling Info : %s \n",
|
||||
jobQueueInfo.getSchedulingInfo()));
|
||||
List<JobQueueInfo> childQueues = jobQueueInfo.getChildren();
|
||||
if (childQueues != null && childQueues.size() > 0) {
|
||||
writer.write(String.format("Child Queues : "));
|
||||
for (int i = 0; i < childQueues.size(); i++) {
|
||||
JobQueueInfo childQueue = childQueues.get(i);
|
||||
writer.write(String.format("%s", childQueue.getQueueName()));
|
||||
if (i != childQueues.size() - 1) {
|
||||
writer.write(String.format(", "));
|
||||
}
|
||||
printJobQueueInfo(childQueues.get(i), writer, " " + prefix);
|
||||
}
|
||||
writer.write("\n");
|
||||
}
|
||||
writer.write(String.format("======================\n"));
|
||||
writer.flush();
|
||||
}
|
||||
|
||||
private void displayQueueList() throws IOException {
|
||||
JobQueueInfo[] rootQueues = jc.getRootQueues();
|
||||
List<JobQueueInfo> allQueues = expandQueueList(rootQueues);
|
||||
for (JobQueueInfo queue : allQueues) {
|
||||
printJobQueueInfo(queue, new PrintWriter(System.out));
|
||||
for (JobQueueInfo queue : rootQueues) {
|
||||
printJobQueueInfo(queue, new PrintWriter(System.out), "");
|
||||
}
|
||||
}
|
||||
|
||||
@ -181,7 +174,7 @@ private void displayQueueInfo(String queue, boolean showJobs)
|
||||
System.out.println("Queue \"" + queue + "\" does not exist.");
|
||||
return;
|
||||
}
|
||||
printJobQueueInfo(jobQueueInfo, new PrintWriter(System.out));
|
||||
printJobQueueInfo(jobQueueInfo, new PrintWriter(System.out), "");
|
||||
if (showJobs && (jobQueueInfo.getChildren() == null ||
|
||||
jobQueueInfo.getChildren().size() == 0)) {
|
||||
JobStatus[] jobs = jc.getJobsFromQueue(queue);
|
||||
|
@ -105,7 +105,7 @@ protected void setChildren(List<JobQueueInfo> children) {
|
||||
public List<JobQueueInfo> getChildren() {
|
||||
List<JobQueueInfo> list = new ArrayList<JobQueueInfo>();
|
||||
for (QueueInfo q : super.getQueueChildren()) {
|
||||
list.add((JobQueueInfo)q);
|
||||
list.add(new JobQueueInfo(q));
|
||||
}
|
||||
return list;
|
||||
}
|
||||
|
@ -0,0 +1,54 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
|
||||
package org.apache.hadoop.mapred;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.PrintWriter;
|
||||
|
||||
import junit.framework.Assert;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
|
||||
public class TestJobQueueClient {
|
||||
/**
|
||||
* Test that print job queue recursively prints child queues
|
||||
*/
|
||||
@Test
|
||||
@SuppressWarnings("deprecation")
|
||||
public void testPrintJobQueueInfo() throws IOException {
|
||||
JobQueueClient queueClient = new JobQueueClient();
|
||||
JobQueueInfo parent = new JobQueueInfo();
|
||||
JobQueueInfo child = new JobQueueInfo();
|
||||
JobQueueInfo grandChild = new JobQueueInfo();
|
||||
child.addChild(grandChild);
|
||||
parent.addChild(child);
|
||||
grandChild.setQueueName("GrandChildQueue");
|
||||
|
||||
ByteArrayOutputStream bbos = new ByteArrayOutputStream();
|
||||
PrintWriter writer = new PrintWriter(bbos);
|
||||
queueClient.printJobQueueInfo(parent, writer, "");
|
||||
|
||||
Assert.assertTrue("printJobQueueInfo did not print grandchild's name",
|
||||
bbos.toString().contains("GrandChildQueue"));
|
||||
}
|
||||
|
||||
}
|
@ -198,13 +198,16 @@ public QueueInfo getQueue(String queueName) throws IOException,
|
||||
}
|
||||
|
||||
private void getChildQueues(org.apache.hadoop.yarn.api.records.QueueInfo parent,
|
||||
List<org.apache.hadoop.yarn.api.records.QueueInfo> queues) {
|
||||
List<org.apache.hadoop.yarn.api.records.QueueInfo> queues,
|
||||
boolean recursive) {
|
||||
List<org.apache.hadoop.yarn.api.records.QueueInfo> childQueues =
|
||||
parent.getChildQueues();
|
||||
|
||||
for (org.apache.hadoop.yarn.api.records.QueueInfo child : childQueues) {
|
||||
queues.add(child);
|
||||
getChildQueues(child, queues);
|
||||
if(recursive) {
|
||||
getChildQueues(child, queues, recursive);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -226,7 +229,7 @@ public QueueInfo[] getQueues() throws IOException, InterruptedException {
|
||||
org.apache.hadoop.yarn.api.records.QueueInfo rootQueue =
|
||||
applicationsManager.getQueueInfo(
|
||||
getQueueInfoRequest(ROOT, false, true, true)).getQueueInfo();
|
||||
getChildQueues(rootQueue, queues);
|
||||
getChildQueues(rootQueue, queues, true);
|
||||
|
||||
return TypeConverter.fromYarnQueueInfo(queues, this.conf);
|
||||
}
|
||||
@ -238,8 +241,8 @@ public QueueInfo[] getRootQueues() throws IOException, InterruptedException {
|
||||
|
||||
org.apache.hadoop.yarn.api.records.QueueInfo rootQueue =
|
||||
applicationsManager.getQueueInfo(
|
||||
getQueueInfoRequest(ROOT, false, true, false)).getQueueInfo();
|
||||
getChildQueues(rootQueue, queues);
|
||||
getQueueInfoRequest(ROOT, false, true, true)).getQueueInfo();
|
||||
getChildQueues(rootQueue, queues, false);
|
||||
|
||||
return TypeConverter.fromYarnQueueInfo(queues, this.conf);
|
||||
}
|
||||
@ -252,7 +255,7 @@ public QueueInfo[] getChildQueues(String parent) throws IOException,
|
||||
org.apache.hadoop.yarn.api.records.QueueInfo parentQueue =
|
||||
applicationsManager.getQueueInfo(
|
||||
getQueueInfoRequest(parent, false, true, false)).getQueueInfo();
|
||||
getChildQueues(parentQueue, queues);
|
||||
getChildQueues(parentQueue, queues, true);
|
||||
|
||||
return TypeConverter.fromYarnQueueInfo(queues, this.conf);
|
||||
}
|
||||
|
@ -0,0 +1,63 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.mapred;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import junit.framework.Assert;
|
||||
|
||||
import org.apache.hadoop.yarn.api.ClientRMProtocol;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.junit.Test;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
public class TestResourceMgrDelegate {
|
||||
|
||||
/**
|
||||
* Tests that getRootQueues makes a request for the (recursive) child queues
|
||||
*/
|
||||
@Test
|
||||
public void testGetRootQueues() throws IOException, InterruptedException {
|
||||
ClientRMProtocol applicationsManager = Mockito.mock(ClientRMProtocol.class);
|
||||
GetQueueInfoResponse response = Mockito.mock(GetQueueInfoResponse.class);
|
||||
org.apache.hadoop.yarn.api.records.QueueInfo queueInfo =
|
||||
Mockito.mock(org.apache.hadoop.yarn.api.records.QueueInfo.class);
|
||||
Mockito.when(response.getQueueInfo()).thenReturn(queueInfo);
|
||||
Mockito.when(applicationsManager.getQueueInfo(Mockito.any(
|
||||
GetQueueInfoRequest.class))).thenReturn(response);
|
||||
|
||||
ResourceMgrDelegate delegate = new ResourceMgrDelegate(
|
||||
new YarnConfiguration(), applicationsManager);
|
||||
delegate.getRootQueues();
|
||||
|
||||
ArgumentCaptor<GetQueueInfoRequest> argument =
|
||||
ArgumentCaptor.forClass(GetQueueInfoRequest.class);
|
||||
Mockito.verify(delegate.applicationsManager).getQueueInfo(
|
||||
argument.capture());
|
||||
|
||||
Assert.assertTrue("Children of root queue not requested",
|
||||
argument.getValue().getIncludeChildQueues());
|
||||
Assert.assertTrue("Request wasn't to recurse through children",
|
||||
argument.getValue().getRecursive());
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue
Block a user