YARN-10663. Add runningApps stats in SLS. Contributed by Vadaga Ananyo Rao

This commit is contained in:
Szilard Nemeth 2021-07-29 17:37:40 +02:00
parent 54f9fff218
commit 74770c8a16
9 changed files with 176 additions and 8 deletions

View File

@ -73,6 +73,11 @@
<groupId>com.fasterxml.jackson.core</groupId> <groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId> <artifactId>jackson-databind</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
<build> <build>

View File

@ -25,7 +25,9 @@
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
@ -50,6 +52,7 @@
import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
@ -118,6 +121,8 @@ public abstract class AMSimulator extends TaskRunner.Task {
private Map<ApplicationId, AMSimulator> appIdToAMSim; private Map<ApplicationId, AMSimulator> appIdToAMSim;
private Set<NodeId> ranNodes = new ConcurrentSkipListSet<NodeId>();
public AMSimulator() { public AMSimulator() {
this.responseQueue = new LinkedBlockingQueue<>(); this.responseQueue = new LinkedBlockingQueue<>();
} }
@ -236,6 +241,11 @@ public void lastStep() throws Exception {
LOG.info("AM container is null"); LOG.info("AM container is null");
} }
// Clear runningApps for ranNodes of this app
for (NodeId nodeId : ranNodes) {
se.getNmMap().get(nodeId).finishApplication(getApplicationId());
}
if (null == appAttemptId) { if (null == appAttemptId) {
// If appAttemptId == null, AM is not launched from RM's perspective, so // If appAttemptId == null, AM is not launched from RM's perspective, so
// it's unnecessary to finish am as well // it's unnecessary to finish am as well
@ -497,4 +507,8 @@ public ApplicationId getApplicationId() {
public ApplicationAttemptId getApplicationAttemptId() { public ApplicationAttemptId getApplicationAttemptId() {
return appAttemptId; return appAttemptId;
} }
public Set<NodeId> getRanNodes() {
return this.ranNodes;
}
} }

View File

@ -189,7 +189,8 @@ protected void processResponseQueue() throws Exception {
appId, container.getId()); appId, container.getId());
assignedContainers.put(container.getId(), cs); assignedContainers.put(container.getId(), cs);
se.getNmMap().get(container.getNodeId()) se.getNmMap().get(container.getNodeId())
.addNewContainer(container, cs.getLifeTime()); .addNewContainer(container, cs.getLifeTime(), appId);
getRanNodes().add(container.getNodeId());
} }
} }
} }

View File

@ -231,14 +231,16 @@ protected void processResponseQueue() throws Exception {
appId, container.getId()); appId, container.getId());
assignedMaps.put(container.getId(), cs); assignedMaps.put(container.getId(), cs);
se.getNmMap().get(container.getNodeId()) se.getNmMap().get(container.getNodeId())
.addNewContainer(container, cs.getLifeTime()); .addNewContainer(container, cs.getLifeTime(), appId);
getRanNodes().add(container.getNodeId());
} else if (! this.scheduledReduces.isEmpty()) { } else if (! this.scheduledReduces.isEmpty()) {
ContainerSimulator cs = scheduledReduces.remove(); ContainerSimulator cs = scheduledReduces.remove();
LOG.debug("Application {} starts to launch a reducer ({}).", LOG.debug("Application {} starts to launch a reducer ({}).",
appId, container.getId()); appId, container.getId());
assignedReduces.put(container.getId(), cs); assignedReduces.put(container.getId(), cs);
se.getNmMap().get(container.getNodeId()) se.getNmMap().get(container.getNodeId())
.addNewContainer(container, cs.getLifeTime()); .addNewContainer(container, cs.getLifeTime(), appId);
getRanNodes().add(container.getNodeId());
} }
} }
} }

View File

@ -172,7 +172,8 @@ protected void processResponseQueue() throws Exception {
container.getId()); container.getId());
assignedStreams.put(container.getId(), cs); assignedStreams.put(container.getId(), cs);
se.getNmMap().get(container.getNodeId()).addNewContainer(container, se.getNmMap().get(container.getNodeId()).addNewContainer(container,
cs.getLifeTime()); cs.getLifeTime(), appId);
getRanNodes().add(container.getNodeId());
} }
} }
} }

View File

@ -250,7 +250,8 @@ public RMNode getNode() {
/** /**
* launch a new container with the given life time * launch a new container with the given life time
*/ */
public void addNewContainer(Container container, long lifeTimeMS) { public void addNewContainer(Container container, long lifeTimeMS,
ApplicationId applicationId) {
LOG.debug("NodeManager {} launches a new container ({}).", LOG.debug("NodeManager {} launches a new container ({}).",
node.getNodeID(), container.getId()); node.getNodeID(), container.getId());
if (lifeTimeMS != -1) { if (lifeTimeMS != -1) {
@ -267,6 +268,15 @@ public void addNewContainer(Container container, long lifeTimeMS) {
amContainerList.add(container.getId()); amContainerList.add(container.getId());
} }
} }
// update runningApplications on the node
if (applicationId != null
&& !getNode().getRunningApps().contains(applicationId)) {
getNode().getRunningApps().add(applicationId);
}
LOG.debug("Adding running app: {} on node: {}. " +
"Updated runningApps on this node are: {}",
applicationId, getNode().getNodeID(), getNode().getRunningApps());
} }
/** /**
@ -296,4 +306,13 @@ List<ContainerId> getAMContainers() {
List<ContainerId> getCompletedContainers() { List<ContainerId> getCompletedContainers() {
return completedContainerList; return completedContainerList;
} }
public void finishApplication(ApplicationId applicationId) {
if (getNode().getRunningApps().contains(applicationId)) {
getNode().getRunningApps().remove(applicationId);
LOG.debug("Removed running app: {} from node: {}. " +
"Updated runningApps on this node are: {}",
applicationId, getNode().getNodeID(), getNode().getRunningApps());
}
}
} }

View File

@ -104,7 +104,8 @@ public void handle(AMLauncherEvent event) {
LOG.info("Notify AM launcher launched:" + amContainer.getId()); LOG.info("Notify AM launcher launched:" + amContainer.getId());
se.getNmMap().get(amContainer.getNodeId()) se.getNmMap().get(amContainer.getNodeId())
.addNewContainer(amContainer, -1); .addNewContainer(amContainer, -1, appId);
ams.getRanNodes().add(amContainer.getNodeId());
return; return;
} catch (Exception e) { } catch (Exception e) {
throw new YarnRuntimeException(e); throw new YarnRuntimeException(e);

View File

@ -22,6 +22,7 @@
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
@ -32,7 +33,9 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.sls.SLSRunner;
import org.apache.hadoop.yarn.sls.conf.SLSConfiguration; import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
import org.apache.hadoop.yarn.sls.nodemanager.NMSimulator;
import org.apache.hadoop.yarn.sls.scheduler.*; import org.apache.hadoop.yarn.sls.scheduler.*;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.After; import org.junit.After;
@ -41,6 +44,7 @@
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.junit.runners.Parameterized; import org.junit.runners.Parameterized;
import org.mockito.Mockito;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Files; import java.nio.file.Files;
@ -50,8 +54,11 @@
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import static org.mockito.Mockito.when;
@RunWith(Parameterized.class) @RunWith(Parameterized.class)
public class TestAMSimulator { public class TestAMSimulator {
private ResourceManager rm; private ResourceManager rm;
@ -288,6 +295,28 @@ public void testPackageRequests() {
Assert.assertEquals(3, nodeRequestCount); Assert.assertEquals(3, nodeRequestCount);
} }
@Test
public void testAMSimulatorRanNodesCleared() throws Exception {
NMSimulator nm = new NMSimulator();
nm.init("/rack1/testNode1", Resources.createResource(1024 * 10, 10), 0, 1000,
rm, -1f);
Map<NodeId, NMSimulator> nmMap = new HashMap<>();
nmMap.put(nm.getNode().getNodeID(), nm);
MockAMSimulator app = new MockAMSimulator();
app.appId = ApplicationId.newInstance(0l, 1);
SLSRunner slsRunner = Mockito.mock(SLSRunner.class);
app.se = slsRunner;
when(slsRunner.getNmMap()).thenReturn(nmMap);
app.getRanNodes().add(nm.getNode().getNodeID());
nm.getNode().getRunningApps().add(app.appId);
Assert.assertTrue(nm.getNode().getRunningApps().contains(app.appId));
app.lastStep();
Assert.assertFalse(nm.getNode().getRunningApps().contains(app.appId));
Assert.assertTrue(nm.getNode().getRunningApps().isEmpty());
}
@After @After
public void tearDown() { public void tearDown() {

View File

@ -19,6 +19,8 @@
import java.util.function.Supplier; import java.util.function.Supplier;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -109,7 +111,7 @@ public void testNMSimulator() throws Exception {
ContainerId cId1 = newContainerId(1, 1, 1); ContainerId cId1 = newContainerId(1, 1, 1);
Container container1 = Container.newInstance(cId1, null, null, Container container1 = Container.newInstance(cId1, null, null,
Resources.createResource(GB, 1), null, null); Resources.createResource(GB, 1), null, null);
node1.addNewContainer(container1, 100000l); node1.addNewContainer(container1, 100000l, null);
Assert.assertTrue("Node1 should have one running container.", Assert.assertTrue("Node1 should have one running container.",
node1.getRunningContainers().containsKey(cId1)); node1.getRunningContainers().containsKey(cId1));
@ -117,7 +119,7 @@ public void testNMSimulator() throws Exception {
ContainerId cId2 = newContainerId(2, 1, 1); ContainerId cId2 = newContainerId(2, 1, 1);
Container container2 = Container.newInstance(cId2, null, null, Container container2 = Container.newInstance(cId2, null, null,
Resources.createResource(GB, 1), null, null); Resources.createResource(GB, 1), null, null);
node1.addNewContainer(container2, -1l); node1.addNewContainer(container2, -1l, null);
Assert.assertTrue("Node1 should have one running AM container", Assert.assertTrue("Node1 should have one running AM container",
node1.getAMContainers().contains(cId2)); node1.getAMContainers().contains(cId2));
@ -137,6 +139,100 @@ private ContainerId newContainerId(int appId, int appAttemptId, int cId) {
appAttemptId), cId); appAttemptId), cId);
} }
@Test
public void testNMSimAppAddedAndRemoved() throws Exception {
// Register one node
NMSimulator node = new NMSimulator();
node.init("/rack1/node1", Resources.createResource(GB * 10, 10), 0, 1000,
rm, -1f);
node.middleStep();
int numClusterNodes = rm.getResourceScheduler().getNumClusterNodes();
int cumulativeSleepTime = 0;
int sleepInterval = 100;
while (numClusterNodes != 1 && cumulativeSleepTime < 5000) {
Thread.sleep(sleepInterval);
cumulativeSleepTime = cumulativeSleepTime + sleepInterval;
numClusterNodes = rm.getResourceScheduler().getNumClusterNodes();
}
GenericTestUtils.waitFor(new com.google.common.base.Supplier<Boolean>() {
@Override
public Boolean get() {
return rm.getResourceScheduler().getRootQueueMetrics()
.getAvailableMB() > 0;
}
}, 500, 10000);
Assert.assertEquals("Node should have no runningApps.",
node.getNode().getRunningApps().size(), 0);
// Allocate one app container on node
ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
ApplicationAttemptId appAttemptId =
BuilderUtils.newApplicationAttemptId(appId, 1);
ContainerId cId = BuilderUtils.newContainerId(appAttemptId, 1);
Container container = Container.newInstance(cId, null, null,
Resources.createResource(GB, 1), null, null);
node.addNewContainer(container, 100000l, appId);
Assert.assertTrue("Node should have app: "
+ appId + " in runningApps list.",
node.getNode().getRunningApps().contains(appId));
// Finish the app on the node.
node.finishApplication(appId);
Assert.assertFalse("Node should not have app: "
+ appId + " in runningApps list.",
node.getNode().getRunningApps().contains(appId));
Assert.assertEquals("Node should have no runningApps.",
node.getNode().getRunningApps().size(), 0);
}
@Test
public void testNMSimNullAppAddedAndRemoved() throws Exception {
// Register one node
NMSimulator node = new NMSimulator();
node.init("/rack1/node1", Resources.createResource(GB * 10, 10), 0, 1000,
rm, -1f);
node.middleStep();
int numClusterNodes = rm.getResourceScheduler().getNumClusterNodes();
int cumulativeSleepTime = 0;
int sleepInterval = 100;
while (numClusterNodes != 1 && cumulativeSleepTime < 5000) {
Thread.sleep(sleepInterval);
cumulativeSleepTime = cumulativeSleepTime + sleepInterval;
numClusterNodes = rm.getResourceScheduler().getNumClusterNodes();
}
GenericTestUtils.waitFor(new com.google.common.base.Supplier<Boolean>() {
@Override
public Boolean get() {
return rm.getResourceScheduler().getRootQueueMetrics()
.getAvailableMB() > 0;
}
}, 500, 10000);
Assert.assertEquals("Node should have no runningApps.",
node.getNode().getRunningApps().size(), 0);
// Allocate null app container on node
ContainerId cId = newContainerId(1, 1, 1);
Container container = Container.newInstance(cId, null, null,
Resources.createResource(GB, 1), null, null);
node.addNewContainer(container, 100000l, null);
Assert.assertEquals("Node should have no runningApps if appId is null.",
node.getNode().getRunningApps().size(), 0);
// Finish non-existent app on the node.
ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
node.finishApplication(appId);
Assert.assertEquals("Node should have no runningApps.",
node.getNode().getRunningApps().size(), 0);
}
@After @After
public void tearDown() throws Exception { public void tearDown() throws Exception {
rm.stop(); rm.stop();