YARN-3896. RMNode transitioned from RUNNING to REBOOTED because its response id has not been reset synchronously. (Jun Gong via rohithsharmaks)

This commit is contained in:
Rohith Sharma K S 2015-08-24 11:25:07 +05:30
parent bcaf83902a
commit feaf034994
8 changed files with 72 additions and 6 deletions

View File

@ -149,6 +149,9 @@ public NodeHeartbeatResponse getLastNodeHeartBeatResponse() {
return null;
}
public void resetLastNodeHeartBeatResponse() {
}
public List<UpdatedContainerInfo> pullContainerUpdates() {
ArrayList<UpdatedContainerInfo> list = new ArrayList<UpdatedContainerInfo>();

View File

@ -134,6 +134,11 @@ public NodeHeartbeatResponse getLastNodeHeartBeatResponse() {
return node.getLastNodeHeartBeatResponse();
}
@Override
public void resetLastNodeHeartBeatResponse() {
node.getLastNodeHeartBeatResponse().setResponseId(0);
}
@Override
@SuppressWarnings("unchecked")
public List<UpdatedContainerInfo> pullContainerUpdates() {

View File

@ -792,6 +792,9 @@ Release 2.8.0 - UNRELEASED
YARN-3986. getTransferredContainers in AbstractYarnScheduler should be present
in YarnScheduler interface instead. (Varun Saxena via rohithsharmaks)
YARN-3896. RMNode transitioned from RUNNING to REBOOTED because its response id
has not been reset synchronously. (Jun Gong via rohithsharmaks)
Release 2.7.2 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -325,6 +325,8 @@ public RegisterNodeManagerResponse registerNodeManager(
} else {
LOG.info("Reconnect from the node at: " + host);
this.nmLivelinessMonitor.unregister(nodeId);
// Reset heartbeat ID since node just restarted.
oldNode.resetLastNodeHeartBeatResponse();
this.rmContext
.getDispatcher()
.getEventHandler()

View File

@ -130,6 +130,11 @@ public interface RMNode {
public NodeHeartbeatResponse getLastNodeHeartBeatResponse();
/**
* Reset lastNodeHeartbeatResponse's ID to 0.
*/
void resetLastNodeHeartBeatResponse();
/**
* Get and clear the list of containerUpdates accumulated across NM
* heartbeats.

View File

@ -443,6 +443,16 @@ public NodeHeartbeatResponse getLastNodeHeartBeatResponse() {
}
}
@Override
public void resetLastNodeHeartBeatResponse() {
this.writeLock.lock();
try {
latestNodeHeartBeatResponse.setResponseId(0);
} finally {
this.writeLock.unlock();
}
}
public void handle(RMNodeEvent event) {
LOG.debug("Processing " + event.getNodeId() + " of type " + event.getType());
try {
@ -617,8 +627,6 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
new NodeRemovedSchedulerEvent(rmNode));
if (rmNode.getHttpPort() == newNode.getHttpPort()) {
// Reset heartbeat ID since node just restarted.
rmNode.getLastNodeHeartBeatResponse().setResponseId(0);
if (!rmNode.getTotalCapability().equals(
newNode.getTotalCapability())) {
rmNode.totalCapability = newNode.getTotalCapability();
@ -656,9 +664,6 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
handleNMContainerStatus(reconnectEvent.getNMContainerStatuses(), rmNode);
// Reset heartbeat ID since node just restarted.
rmNode.getLastNodeHeartBeatResponse().setResponseId(0);
for (ApplicationId appId : reconnectEvent.getRunningApplications()) {
handleRunningAppOnNode(rmNode, rmNode.context, appId, rmNode.nodeId);
}

View File

@ -200,6 +200,10 @@ public NodeHeartbeatResponse getLastNodeHeartBeatResponse() {
return null;
}
@Override
public void resetLastNodeHeartBeatResponse() {
}
@Override
public String getNodeManagerVersion() {
return null;

View File

@ -21,6 +21,11 @@
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.junit.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.NodeId;
@ -189,4 +194,38 @@ public void testCompareRMNodeAfterReconnect() throws Exception {
nlm.stop();
scheduler.stop();
}
@Test(timeout = 10000)
public void testRMNodeStatusAfterReconnect() throws Exception {
// The node(127.0.0.1:1234) reconnected with RM. When it registered with
// RM, RM set its lastNodeHeartbeatResponse's id to 0 asynchronously. But
// the node's heartbeat come before RM succeeded setting the id to 0.
final DrainDispatcher dispatcher = new DrainDispatcher();
MockRM rm = new MockRM(){
@Override
protected Dispatcher createDispatcher() {
return dispatcher;
}
};
rm.start();
MockNM nm1 =
new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService());
nm1.registerNode();
int i = 0;
while(i < 3) {
nm1.nodeHeartbeat(true);
dispatcher.await();
i++;
}
MockNM nm2 =
new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService());
nm2.registerNode();
RMNode rmNode = rm.getRMContext().getRMNodes().get(nm2.getNodeId());
nm2.nodeHeartbeat(true);
dispatcher.await();
Assert.assertEquals("Node is Not in Running state.", NodeState.RUNNING,
rmNode.getState());
rm.stop();
}
}