YARN-9618. NodesListManager event improvement. Contributed by Qi Zhu.
This commit is contained in:
parent
a0deda1a77
commit
9f1655baf2
@ -509,12 +509,8 @@ private void sendRMAppNodeUpdateEventToNonFinalizedApps(
|
|||||||
RMNode eventNode, RMAppNodeUpdateType appNodeUpdateType) {
|
RMNode eventNode, RMAppNodeUpdateType appNodeUpdateType) {
|
||||||
for(RMApp app : rmContext.getRMApps().values()) {
|
for(RMApp app : rmContext.getRMApps().values()) {
|
||||||
if (!app.isAppFinalStateStored()) {
|
if (!app.isAppFinalStateStored()) {
|
||||||
this.rmContext
|
app.handle(new RMAppNodeUpdateEvent(app.getApplicationId(), eventNode,
|
||||||
.getDispatcher()
|
appNodeUpdateType));
|
||||||
.getEventHandler()
|
|
||||||
.handle(
|
|
||||||
new RMAppNodeUpdateEvent(app.getApplicationId(), eventNode,
|
|
||||||
appNodeUpdateType));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -22,10 +22,7 @@
|
|||||||
import static org.mockito.Mockito.doNothing;
|
import static org.mockito.Mockito.doNothing;
|
||||||
import static org.mockito.Mockito.spy;
|
import static org.mockito.Mockito.spy;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
|
||||||
|
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
@ -52,8 +49,8 @@
|
|||||||
import org.mockito.ArgumentMatcher;
|
import org.mockito.ArgumentMatcher;
|
||||||
|
|
||||||
public class TestNodesListManager {
|
public class TestNodesListManager {
|
||||||
// To hold list of application for which event was received
|
private boolean isRMAppEvent;
|
||||||
ArrayList<ApplicationId> applist = new ArrayList<ApplicationId>();
|
private boolean isNodesListEvent;
|
||||||
|
|
||||||
@Test(timeout = 300000)
|
@Test(timeout = 300000)
|
||||||
public void testNodeUsableEvent() throws Exception {
|
public void testNodeUsableEvent() throws Exception {
|
||||||
@ -68,67 +65,32 @@ protected Dispatcher createDispatcher() {
|
|||||||
};
|
};
|
||||||
rm.start();
|
rm.start();
|
||||||
MockNM nm1 = rm.registerNode("h1:1234", 28000);
|
MockNM nm1 = rm.registerNode("h1:1234", 28000);
|
||||||
NodesListManager nodesListManager = rm.getNodesListManager();
|
|
||||||
Resource clusterResource = Resource.newInstance(28000, 8);
|
Resource clusterResource = Resource.newInstance(28000, 8);
|
||||||
RMNode rmnode = MockNodes.newNodeInfo(1, clusterResource);
|
RMNode rmnode = MockNodes.newNodeInfo(1, clusterResource);
|
||||||
|
|
||||||
// Create killing APP
|
// Create killing APP
|
||||||
RMApp killrmApp = MockRMAppSubmitter.submitWithMemory(200, rm);
|
RMApp killRmApp = MockRMAppSubmitter.submitWithMemory(200, rm);
|
||||||
rm.killApp(killrmApp.getApplicationId());
|
rm.killApp(killRmApp.getApplicationId());
|
||||||
rm.waitForState(killrmApp.getApplicationId(), RMAppState.KILLED);
|
rm.waitForState(killRmApp.getApplicationId(), RMAppState.KILLED);
|
||||||
|
|
||||||
// Create finish APP
|
// Create finish APP
|
||||||
RMApp finshrmApp = MockRMAppSubmitter.submitWithMemory(2000, rm);
|
RMApp finshRmApp = MockRMAppSubmitter.submitWithMemory(2000, rm);
|
||||||
nm1.nodeHeartbeat(true);
|
nm1.nodeHeartbeat(true);
|
||||||
RMAppAttempt attempt = finshrmApp.getCurrentAppAttempt();
|
RMAppAttempt attempt = finshRmApp.getCurrentAppAttempt();
|
||||||
MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
|
MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
|
||||||
am.registerAppAttempt();
|
am.registerAppAttempt();
|
||||||
am.unregisterAppAttempt();
|
am.unregisterAppAttempt();
|
||||||
nm1.nodeHeartbeat(attempt.getAppAttemptId(), 1, ContainerState.COMPLETE);
|
nm1.nodeHeartbeat(attempt.getAppAttemptId(), 1, ContainerState.COMPLETE);
|
||||||
rm.waitForState(am.getApplicationAttemptId(), RMAppAttemptState.FINISHED);
|
rm.waitForState(am.getApplicationAttemptId(), RMAppAttemptState.FINISHED);
|
||||||
|
|
||||||
// Create submitted App
|
|
||||||
RMApp subrmApp = MockRMAppSubmitter.submitWithMemory(200, rm);
|
|
||||||
|
|
||||||
// Fire Event for NODE_USABLE
|
// Fire Event for NODE_USABLE
|
||||||
nodesListManager.handle(new NodesListManagerEvent(
|
// Should not have RMAppNodeUpdateEvent to AsyncDispatcher.
|
||||||
|
dispatcher.getEventHandler().handle(new NodesListManagerEvent(
|
||||||
NodesListManagerEventType.NODE_USABLE, rmnode));
|
NodesListManagerEventType.NODE_USABLE, rmnode));
|
||||||
if (applist.size() > 0) {
|
Assert.assertFalse("Got unexpected RM app event",
|
||||||
Assert.assertTrue(
|
getIsRMAppEvent());
|
||||||
"Event based on running app expected " + subrmApp.getApplicationId(),
|
Assert.assertTrue("Received no NodesListManagerEvent",
|
||||||
applist.contains(subrmApp.getApplicationId()));
|
getIsNodesListEvent());
|
||||||
Assert.assertFalse(
|
|
||||||
"Event based on finish app not expected "
|
|
||||||
+ finshrmApp.getApplicationId(),
|
|
||||||
applist.contains(finshrmApp.getApplicationId()));
|
|
||||||
Assert.assertFalse(
|
|
||||||
"Event based on killed app not expected "
|
|
||||||
+ killrmApp.getApplicationId(),
|
|
||||||
applist.contains(killrmApp.getApplicationId()));
|
|
||||||
} else {
|
|
||||||
Assert.fail("Events received should have beeen more than 1");
|
|
||||||
}
|
|
||||||
applist.clear();
|
|
||||||
|
|
||||||
// Fire Event for NODE_UNUSABLE
|
|
||||||
nodesListManager.handle(new NodesListManagerEvent(
|
|
||||||
NodesListManagerEventType.NODE_UNUSABLE, rmnode));
|
|
||||||
if (applist.size() > 0) {
|
|
||||||
Assert.assertTrue(
|
|
||||||
"Event based on running app expected " + subrmApp.getApplicationId(),
|
|
||||||
applist.contains(subrmApp.getApplicationId()));
|
|
||||||
Assert.assertFalse(
|
|
||||||
"Event based on finish app not expected "
|
|
||||||
+ finshrmApp.getApplicationId(),
|
|
||||||
applist.contains(finshrmApp.getApplicationId()));
|
|
||||||
Assert.assertFalse(
|
|
||||||
"Event based on killed app not expected "
|
|
||||||
+ killrmApp.getApplicationId(),
|
|
||||||
applist.contains(killrmApp.getApplicationId()));
|
|
||||||
} else {
|
|
||||||
Assert.fail("Events received should have beeen more than 1");
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -241,9 +203,10 @@ class EventArgMatcher implements ArgumentMatcher<AbstractEvent> {
|
|||||||
@Override
|
@Override
|
||||||
public boolean matches(AbstractEvent argument) {
|
public boolean matches(AbstractEvent argument) {
|
||||||
if (argument instanceof RMAppNodeUpdateEvent) {
|
if (argument instanceof RMAppNodeUpdateEvent) {
|
||||||
ApplicationId appid =
|
isRMAppEvent = true;
|
||||||
((RMAppNodeUpdateEvent) argument).getApplicationId();
|
}
|
||||||
applist.add(appid);
|
if (argument instanceof NodesListManagerEvent) {
|
||||||
|
isNodesListEvent = true;
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
@ -256,4 +219,11 @@ public boolean matches(AbstractEvent argument) {
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean getIsNodesListEvent() {
|
||||||
|
return isNodesListEvent;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean getIsRMAppEvent() {
|
||||||
|
return isRMAppEvent;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user