YARN-11274. Impove Nodemanager#NodeStatusUpdaterImpl Log. (#4783). Contributed by fanshilun.
Signed-off-by: Ayush Saxena <ayushsaxena@apache.org>
This commit is contained in:
parent
e77d54d1ee
commit
b2760520c3
@ -201,7 +201,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
|||||||
// Update configured resources via plugins.
|
// Update configured resources via plugins.
|
||||||
updateConfiguredResourcesViaPlugins(totalResource);
|
updateConfiguredResourcesViaPlugins(totalResource);
|
||||||
|
|
||||||
LOG.info("Nodemanager resources is set to: " + totalResource);
|
LOG.info("Nodemanager resources is set to: {}.", totalResource);
|
||||||
|
|
||||||
metrics.addResource(totalResource);
|
metrics.addResource(totalResource);
|
||||||
|
|
||||||
@ -247,9 +247,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
|||||||
LOG.debug("{} :{}", YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS,
|
LOG.debug("{} :{}", YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS,
|
||||||
durationToTrackStoppedContainers);
|
durationToTrackStoppedContainers);
|
||||||
super.serviceInit(conf);
|
super.serviceInit(conf);
|
||||||
LOG.info("Initialized nodemanager with :" +
|
LOG.info("Initialized nodemanager with : physical-memory={} virtual-memory={} " +
|
||||||
" physical-memory=" + memoryMb + " virtual-memory=" + virtualMemoryMb +
|
"virtual-cores={}.", memoryMb, virtualMemoryMb, virtualCores);
|
||||||
" virtual-cores=" + virtualCores);
|
|
||||||
|
|
||||||
this.logAggregationEnabled =
|
this.logAggregationEnabled =
|
||||||
conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
|
conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
|
||||||
@ -264,7 +263,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
|||||||
|
|
||||||
// NodeManager is the last service to start, so NodeId is available.
|
// NodeManager is the last service to start, so NodeId is available.
|
||||||
this.nodeId = this.context.getNodeId();
|
this.nodeId = this.context.getNodeId();
|
||||||
LOG.info("Node ID assigned is : " + this.nodeId);
|
LOG.info("Node ID assigned is : {}.", this.nodeId);
|
||||||
this.httpPort = this.context.getHttpPort();
|
this.httpPort = this.context.getHttpPort();
|
||||||
this.nodeManagerVersionId = YarnVersionInfo.getVersion();
|
this.nodeManagerVersionId = YarnVersionInfo.getVersion();
|
||||||
try {
|
try {
|
||||||
@ -312,10 +311,9 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
|||||||
request.setNodeId(this.nodeId);
|
request.setNodeId(this.nodeId);
|
||||||
try {
|
try {
|
||||||
resourceTracker.unRegisterNodeManager(request);
|
resourceTracker.unRegisterNodeManager(request);
|
||||||
LOG.info("Successfully Unregistered the Node " + this.nodeId
|
LOG.info("Successfully Unregistered the Node {} with ResourceManager.", this.nodeId);
|
||||||
+ " with ResourceManager.");
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.warn("Unregistration of the Node " + this.nodeId + " failed.", e);
|
LOG.warn("Unregistration of the Node {} failed.", this.nodeId, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -399,7 +397,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
|||||||
nodeLabels, physicalResource, nodeAttributes, nodeStatus);
|
nodeLabels, physicalResource, nodeAttributes, nodeStatus);
|
||||||
|
|
||||||
if (containerReports != null && !containerReports.isEmpty()) {
|
if (containerReports != null && !containerReports.isEmpty()) {
|
||||||
LOG.info("Registering with RM using containers :" + containerReports);
|
LOG.info("Registering with RM using containers.size : {}." + containerReports.size());
|
||||||
}
|
}
|
||||||
if (logAggregationEnabled) {
|
if (logAggregationEnabled) {
|
||||||
// pull log aggregation status for application running in this NM
|
// pull log aggregation status for application running in this NM
|
||||||
@ -641,6 +639,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
|||||||
runningApplications.add(appEntry.getKey());
|
runningApplications.add(appEntry.getKey());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
LOG.info("Running Applications Size : {}.", runningApplications.size());
|
||||||
return runningApplications;
|
return runningApplications;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -667,8 +666,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (!containerStatuses.isEmpty()) {
|
if (!containerStatuses.isEmpty()) {
|
||||||
LOG.info("Sending out " + containerStatuses.size()
|
LOG.info("Sending out {} container NM container statuses: {}.",
|
||||||
+ " NM container statuses: " + containerStatuses);
|
containerStatuses.size(), containerStatuses);
|
||||||
}
|
}
|
||||||
return containerStatuses;
|
return containerStatuses;
|
||||||
}
|
}
|
||||||
@ -724,8 +723,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (!removedContainers.isEmpty()) {
|
if (!removedContainers.isEmpty()) {
|
||||||
LOG.info("Removed completed containers from NM context: "
|
LOG.info("Removed completed containers from NM context: {}.", removedContainers);
|
||||||
+ removedContainers);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -792,7 +790,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
|||||||
try {
|
try {
|
||||||
context.getNMStateStore().removeContainer(cid);
|
context.getNMStateStore().removeContainer(cid);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.error("Unable to remove container " + cid + " in store", e);
|
LOG.error("Unable to remove container {} in store.", cid, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -839,18 +837,15 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
|||||||
if (response.getNodeAction() == NodeAction.SHUTDOWN) {
|
if (response.getNodeAction() == NodeAction.SHUTDOWN) {
|
||||||
LOG.warn("Received SHUTDOWN signal from Resourcemanager as part of"
|
LOG.warn("Received SHUTDOWN signal from Resourcemanager as part of"
|
||||||
+ " heartbeat, hence shutting down.");
|
+ " heartbeat, hence shutting down.");
|
||||||
LOG.warn("Message from ResourceManager: "
|
LOG.warn("Message from ResourceManager: {}.", response.getDiagnosticsMessage());
|
||||||
+ response.getDiagnosticsMessage());
|
|
||||||
context.setDecommissioned(true);
|
context.setDecommissioned(true);
|
||||||
dispatcher.getEventHandler().handle(
|
dispatcher.getEventHandler().handle(
|
||||||
new NodeManagerEvent(NodeManagerEventType.SHUTDOWN));
|
new NodeManagerEvent(NodeManagerEventType.SHUTDOWN));
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
if (response.getNodeAction() == NodeAction.RESYNC) {
|
if (response.getNodeAction() == NodeAction.RESYNC) {
|
||||||
LOG.warn("Node is out of sync with ResourceManager,"
|
LOG.warn("Node is out of sync with ResourceManager, hence resyncing.");
|
||||||
+ " hence resyncing.");
|
LOG.warn("Message from ResourceManager: {}.", response.getDiagnosticsMessage());
|
||||||
LOG.warn("Message from ResourceManager: "
|
|
||||||
+ response.getDiagnosticsMessage());
|
|
||||||
// Invalidate the RMIdentifier while resync
|
// Invalidate the RMIdentifier while resync
|
||||||
NodeStatusUpdaterImpl.this.rmIdentifier =
|
NodeStatusUpdaterImpl.this.rmIdentifier =
|
||||||
ResourceManagerConstants.RM_INVALID_IDENTIFIER;
|
ResourceManagerConstants.RM_INVALID_IDENTIFIER;
|
||||||
@ -1095,8 +1090,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
|||||||
try {
|
try {
|
||||||
NodeLabelUtil.validateNodeAttributes(nodeAttributes);
|
NodeLabelUtil.validateNodeAttributes(nodeAttributes);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.error(
|
LOG.error("Invalid node attribute(s) from Provider : {}.", e.getMessage());
|
||||||
"Invalid node attribute(s) from Provider : " + e.getMessage());
|
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1136,9 +1130,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
|||||||
} else {
|
} else {
|
||||||
// case where updated node attributes from NodeAttributesProvider
|
// case where updated node attributes from NodeAttributesProvider
|
||||||
// is sent to RM and RM rejected the attributes
|
// is sent to RM and RM rejected the attributes
|
||||||
LOG.error("NM node attributes {" + getPreviousValue()
|
LOG.error("NM node attributes [{}] were not accepted by RM and message from RM : {}.",
|
||||||
+ "} were not accepted by RM and message from RM : " + response
|
getPreviousValue(), response.getDiagnosticsMessage());
|
||||||
.getDiagnosticsMessage());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1262,7 +1255,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (hasInvalidLabel) {
|
if (hasInvalidLabel) {
|
||||||
LOG.error("Invalid Node Label(s) from Provider : " + errorMsg);
|
LOG.error("Invalid Node Label(s) from Provider : {}.", errorMsg);
|
||||||
throw new IOException(errorMsg.toString());
|
throw new IOException(errorMsg.toString());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1287,10 +1280,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
|||||||
} else {
|
} else {
|
||||||
// case where updated labels from NodeLabelsProvider is sent to RM and
|
// case where updated labels from NodeLabelsProvider is sent to RM and
|
||||||
// RM rejected the labels
|
// RM rejected the labels
|
||||||
LOG.error(
|
LOG.error("NM node labels [{}] were not accepted by RM and message from RM : {}.",
|
||||||
"NM node labels {" + StringUtils.join(",", getPreviousValue())
|
StringUtils.join(",", getPreviousValue()), response.getDiagnosticsMessage());
|
||||||
+ "} were not accepted by RM and message from RM : "
|
|
||||||
+ response.getDiagnosticsMessage());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -181,7 +181,7 @@ public class TestNodeStatusUpdater extends NodeManagerTestBase {
|
|||||||
IOException {
|
IOException {
|
||||||
NodeId nodeId = request.getNodeId();
|
NodeId nodeId = request.getNodeId();
|
||||||
Resource resource = request.getResource();
|
Resource resource = request.getResource();
|
||||||
LOG.info("Registering " + nodeId.toString());
|
LOG.info("Registering {}.", nodeId.toString());
|
||||||
// NOTE: this really should be checking against the config value
|
// NOTE: this really should be checking against the config value
|
||||||
InetSocketAddress expected = NetUtils.getConnectAddress(
|
InetSocketAddress expected = NetUtils.getConnectAddress(
|
||||||
conf.getSocketAddr(YarnConfiguration.NM_ADDRESS, null, -1));
|
conf.getSocketAddr(YarnConfiguration.NM_ADDRESS, null, -1));
|
||||||
@ -217,7 +217,7 @@ public class TestNodeStatusUpdater extends NodeManagerTestBase {
|
|||||||
public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
|
public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
|
||||||
throws YarnException, IOException {
|
throws YarnException, IOException {
|
||||||
NodeStatus nodeStatus = request.getNodeStatus();
|
NodeStatus nodeStatus = request.getNodeStatus();
|
||||||
LOG.info("Got heartbeat number " + heartBeatID);
|
LOG.info("Got heartbeat number {}.", heartBeatID);
|
||||||
NodeManagerMetrics mockMetrics = mock(NodeManagerMetrics.class);
|
NodeManagerMetrics mockMetrics = mock(NodeManagerMetrics.class);
|
||||||
Dispatcher mockDispatcher = mock(Dispatcher.class);
|
Dispatcher mockDispatcher = mock(Dispatcher.class);
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
@ -625,7 +625,7 @@ public class TestNodeStatusUpdater extends NodeManagerTestBase {
|
|||||||
@Override
|
@Override
|
||||||
public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
|
public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
|
||||||
throws YarnException, IOException {
|
throws YarnException, IOException {
|
||||||
LOG.info("Got heartBeatId: [" + heartBeatID +"]");
|
LOG.info("Got heartBeatId: [{}]", heartBeatID);
|
||||||
NodeStatus nodeStatus = request.getNodeStatus();
|
NodeStatus nodeStatus = request.getNodeStatus();
|
||||||
nodeStatus.setResponseId(heartBeatID.getAndIncrement());
|
nodeStatus.setResponseId(heartBeatID.getAndIncrement());
|
||||||
NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils.
|
NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils.
|
||||||
@ -644,7 +644,7 @@ public class TestNodeStatusUpdater extends NodeManagerTestBase {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (heartBeatID.get() == 2) {
|
if (heartBeatID.get() == 2) {
|
||||||
LOG.info("Sending FINISH_APP for application: [" + appId + "]");
|
LOG.info("Sending FINISH_APP for application: [{}]", appId);
|
||||||
this.context.getApplications().put(appId, mock(Application.class));
|
this.context.getApplications().put(appId, mock(Application.class));
|
||||||
nhResponse.addAllApplicationsToCleanup(Collections.singletonList(appId));
|
nhResponse.addAllApplicationsToCleanup(Collections.singletonList(appId));
|
||||||
}
|
}
|
||||||
@ -1528,7 +1528,7 @@ public class TestNodeStatusUpdater extends NodeManagerTestBase {
|
|||||||
rt.context.getApplications().remove(rt.appId);
|
rt.context.getApplications().remove(rt.appId);
|
||||||
Assert.assertEquals(1, rt.keepAliveRequests.size());
|
Assert.assertEquals(1, rt.keepAliveRequests.size());
|
||||||
int numKeepAliveRequests = rt.keepAliveRequests.get(rt.appId).size();
|
int numKeepAliveRequests = rt.keepAliveRequests.get(rt.appId).size();
|
||||||
LOG.info("Number of Keep Alive Requests: [" + numKeepAliveRequests + "]");
|
LOG.info("Number of Keep Alive Requests: [{}]", numKeepAliveRequests);
|
||||||
Assert.assertTrue(numKeepAliveRequests == 2 || numKeepAliveRequests == 3);
|
Assert.assertTrue(numKeepAliveRequests == 2 || numKeepAliveRequests == 3);
|
||||||
GenericTestUtils.waitFor(
|
GenericTestUtils.waitFor(
|
||||||
() -> nm.getServiceState() != STATE.STARTED
|
() -> nm.getServiceState() != STATE.STARTED
|
||||||
|
Loading…
x
Reference in New Issue
Block a user