YARN-3826. Race condition in ResourceTrackerService leads to wrong

diagnostics messages. Contributed by Chengbing Liu.
This commit is contained in:
Devaraj K 2015-06-25 16:13:59 +05:30
parent a815cc157c
commit 57f1a01eda
3 changed files with 18 additions and 19 deletions

View File

@ -553,6 +553,9 @@ Release 2.8.0 - UNRELEASED
YARN-3790. usedResource from rootQueue metrics may get stale data for FS YARN-3790. usedResource from rootQueue metrics may get stale data for FS
scheduler after recovering the container (Zhihai Xu via rohithsharmaks) scheduler after recovering the container (Zhihai Xu via rohithsharmaks)
YARN-3826. Race condition in ResourceTrackerService leads to
wrong diagnostics messages. (Chengbing Liu via devaraj)
Release 2.7.1 - UNRELEASED Release 2.7.1 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -22,13 +22,11 @@
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.SerializedException;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.util.Records;
/** /**
* Server Builder utilities to construct various objects. * Server Builder utilities to construct various objects.
@ -39,6 +37,15 @@ public class YarnServerBuilderUtils {
private static final RecordFactory recordFactory = RecordFactoryProvider private static final RecordFactory recordFactory = RecordFactoryProvider
.getRecordFactory(null); .getRecordFactory(null);
public static NodeHeartbeatResponse newNodeHeartbeatResponse(
NodeAction action, String diagnosticsMessage) {
NodeHeartbeatResponse response = recordFactory
.newRecordInstance(NodeHeartbeatResponse.class);
response.setNodeAction(action);
response.setDiagnosticsMessage(diagnosticsMessage);
return response;
}
public static NodeHeartbeatResponse newNodeHeartbeatResponse(int responseId, public static NodeHeartbeatResponse newNodeHeartbeatResponse(int responseId,
NodeAction action, List<ContainerId> containersToCleanUp, NodeAction action, List<ContainerId> containersToCleanUp,
List<ApplicationId> applicationsToCleanUp, List<ApplicationId> applicationsToCleanUp,

View File

@ -100,22 +100,11 @@ public class ResourceTrackerService extends AbstractService implements
private InetSocketAddress resourceTrackerAddress; private InetSocketAddress resourceTrackerAddress;
private String minimumNodeManagerVersion; private String minimumNodeManagerVersion;
private static final NodeHeartbeatResponse resync = recordFactory
.newRecordInstance(NodeHeartbeatResponse.class);
private static final NodeHeartbeatResponse shutDown = recordFactory
.newRecordInstance(NodeHeartbeatResponse.class);
private int minAllocMb; private int minAllocMb;
private int minAllocVcores; private int minAllocVcores;
private boolean isDistributedNodeLabelsConf; private boolean isDistributedNodeLabelsConf;
static {
resync.setNodeAction(NodeAction.RESYNC);
shutDown.setNodeAction(NodeAction.SHUTDOWN);
}
public ResourceTrackerService(RMContext rmContext, public ResourceTrackerService(RMContext rmContext,
NodesListManager nodesListManager, NodesListManager nodesListManager,
NMLivelinessMonitor nmLivelinessMonitor, NMLivelinessMonitor nmLivelinessMonitor,
@ -414,8 +403,8 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
"Disallowed NodeManager nodeId: " + nodeId + " hostname: " "Disallowed NodeManager nodeId: " + nodeId + " hostname: "
+ nodeId.getHost(); + nodeId.getHost();
LOG.info(message); LOG.info(message);
shutDown.setDiagnosticsMessage(message); return YarnServerBuilderUtils.newNodeHeartbeatResponse(
return shutDown; NodeAction.SHUTDOWN, message);
} }
// 2. Check if it's a registered node // 2. Check if it's a registered node
@ -424,8 +413,8 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
/* node does not exist */ /* node does not exist */
String message = "Node not found resyncing " + remoteNodeStatus.getNodeId(); String message = "Node not found resyncing " + remoteNodeStatus.getNodeId();
LOG.info(message); LOG.info(message);
resync.setDiagnosticsMessage(message); return YarnServerBuilderUtils.newNodeHeartbeatResponse(NodeAction.RESYNC,
return resync; message);
} }
// Send ping // Send ping
@ -445,11 +434,11 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
+ lastNodeHeartbeatResponse.getResponseId() + " nm response id:" + lastNodeHeartbeatResponse.getResponseId() + " nm response id:"
+ remoteNodeStatus.getResponseId(); + remoteNodeStatus.getResponseId();
LOG.info(message); LOG.info(message);
resync.setDiagnosticsMessage(message);
// TODO: Just sending reboot is not enough. Think more. // TODO: Just sending reboot is not enough. Think more.
this.rmContext.getDispatcher().getEventHandler().handle( this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeEvent(nodeId, RMNodeEventType.REBOOTING)); new RMNodeEvent(nodeId, RMNodeEventType.REBOOTING));
return resync; return YarnServerBuilderUtils.newNodeHeartbeatResponse(NodeAction.RESYNC,
message);
} }
// Heartbeat response // Heartbeat response