diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 85d5a58036..54e8888f0d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -2013,6 +2013,13 @@ public class YarnConfiguration extends Configuration { NM_PREFIX + "health-checker.interval-ms"; public static final long DEFAULT_NM_HEALTH_CHECK_INTERVAL_MS = 10 * 60 * 1000; + /** Whether or not to run the node health script before the NM + * starts up.*/ + public static final String NM_HEALTH_CHECK_RUN_BEFORE_STARTUP = + NM_PREFIX + "health-checker.run-before-startup"; + public static final boolean DEFAULT_NM_HEALTH_CHECK_RUN_BEFORE_STARTUP = + false; + /** Health check time out period for all scripts.*/ public static final String NM_HEALTH_CHECK_TIMEOUT_MS = NM_PREFIX + "health-checker.timeout-ms"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index f09186ecf4..2f97a7cce7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -1668,6 +1668,13 @@ 1200000 + + Whether or not to run the node health script + before the NM starts up. + yarn.nodemanager.health-checker.run-before-startup + false + + Frequency of running node health scripts. yarn.nodemanager.health-checker.interval-ms diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java index acec16fd56..54b39155c6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java @@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.api.records.NodeAttribute; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.util.Records; public abstract class RegisterNodeManagerRequest { @@ -53,14 +54,15 @@ public abstract class RegisterNodeManagerRequest { Resource physicalResource) { return newInstance(nodeId, httpPort, resource, nodeManagerVersionId, containerStatuses, runningApplications, nodeLabels, physicalResource, - null); + null, null); } public static RegisterNodeManagerRequest newInstance(NodeId nodeId, int httpPort, Resource resource, String nodeManagerVersionId, List containerStatuses, List runningApplications, Set nodeLabels, - Resource physicalResource, Set nodeAttributes) { + Resource physicalResource, Set nodeAttributes, + NodeStatus nodeStatus) { RegisterNodeManagerRequest request = Records.newRecord(RegisterNodeManagerRequest.class); request.setHttpPort(httpPort); @@ -72,6 +74,7 @@ public abstract class RegisterNodeManagerRequest { request.setNodeLabels(nodeLabels); request.setPhysicalResource(physicalResource); request.setNodeAttributes(nodeAttributes); + request.setNodeStatus(nodeStatus); return request; } @@ -133,4 +136,16 @@ public abstract class RegisterNodeManagerRequest { public abstract Set getNodeAttributes(); public abstract void setNodeAttributes(Set nodeAttributes); + + /** + * Get the status of the node. + * @return The status of the node. + */ + public abstract NodeStatus getNodeStatus(); + + /** + * Set the status of the node. + * @param nodeStatus The status of the node. + */ + public abstract void setNodeStatus(NodeStatus nodeStatus); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java index 317f8abd6f..d91cff2531 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java @@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeLabelProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeAttributeProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NMContainerStatusProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto; @@ -51,7 +52,9 @@ import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeMa import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; - +import org.apache.hadoop.yarn.server.api.records.NodeStatus; +import org.apache.hadoop.yarn.server.api.records.impl.pb.NodeStatusPBImpl; + public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest { RegisterNodeManagerRequestProto proto = RegisterNodeManagerRequestProto.getDefaultInstance(); RegisterNodeManagerRequestProto.Builder builder = null; @@ -68,6 +71,7 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest /** Physical resources in the node. */ private Resource physicalResource = null; + private NodeStatus nodeStatus; public RegisterNodeManagerRequestPBImpl() { builder = RegisterNodeManagerRequestProto.newBuilder(); @@ -121,6 +125,9 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest if (this.logAggregationReportsForApps != null) { addLogAggregationStatusForAppsToProto(); } + if (this.nodeStatus != null) { + builder.setNodeStatus(convertToProtoFormat(this.nodeStatus)); + } } private void addLogAggregationStatusForAppsToProto() { @@ -359,6 +366,28 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest this.physicalResource = pPhysicalResource; } + @Override + public synchronized NodeStatus getNodeStatus() { + RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder; + if (this.nodeStatus != null) { + return this.nodeStatus; + } + if (!p.hasNodeStatus()) { + return null; + } + this.nodeStatus = convertFromProtoFormat(p.getNodeStatus()); + return this.nodeStatus; + } + + @Override + public synchronized void setNodeStatus(NodeStatus pNodeStatus) { + maybeInitBuilder(); + if (pNodeStatus == null) { + builder.clearNodeStatus(); + } + this.nodeStatus = pNodeStatus; + } + @Override public int hashCode() { return getProto().hashCode(); @@ -533,4 +562,12 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest } this.logAggregationReportsForApps = logAggregationStatusForApps; } + + private NodeStatusPBImpl convertFromProtoFormat(NodeStatusProto s) { + return new NodeStatusPBImpl(s); + } + + private NodeStatusProto convertToProtoFormat(NodeStatus s) { + return ((NodeStatusPBImpl)s).getProto(); + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index ff7153eca8..c643179888 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -74,6 +74,7 @@ message RegisterNodeManagerRequestProto { optional ResourceProto physicalResource = 9; repeated LogAggregationReportProto log_aggregation_reports_for_apps = 10; optional NodeAttributesProto nodeAttributes = 11; + optional NodeStatusProto nodeStatus = 12; } message RegisterNodeManagerResponseProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 5e3693ae9c..0725d42309 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -392,10 +392,11 @@ public class NodeStatusUpdaterImpl extends AbstractService implements // during RM recovery synchronized (this.context) { List containerReports = getNMContainerStatuses(); + NodeStatus nodeStatus = getNodeStatus(0); RegisterNodeManagerRequest request = RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource, nodeManagerVersionId, containerReports, getRunningApplications(), - nodeLabels, physicalResource, nodeAttributes); + nodeLabels, physicalResource, nodeAttributes, nodeStatus); if (containerReports != null) { LOG.info("Registering with RM using containers :" + containerReports); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/health/NodeHealthScriptRunner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/health/NodeHealthScriptRunner.java index 1c9bd82bd4..af92b15e9c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/health/NodeHealthScriptRunner.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/health/NodeHealthScriptRunner.java @@ -60,8 +60,9 @@ public class NodeHealthScriptRunner extends TimedHealthReporterService { "Node health script timed out"; private NodeHealthScriptRunner(String scriptName, long checkInterval, - long timeout, String[] scriptArgs) { - super(NodeHealthScriptRunner.class.getName(), checkInterval); + long timeout, String[] scriptArgs, boolean runBeforeStartup) { + super(NodeHealthScriptRunner.class.getName(), checkInterval, + runBeforeStartup); this.nodeHealthScript = scriptName; this.scriptTimeout = timeout; setTimerTask(new NodeHealthMonitorExecutor(scriptArgs)); @@ -91,6 +92,10 @@ public class NodeHealthScriptRunner extends TimedHealthReporterService { "interval-ms can not be set to a negative number."); } + boolean runBeforeStartup = conf.getBoolean( + YarnConfiguration.NM_HEALTH_CHECK_RUN_BEFORE_STARTUP, + YarnConfiguration.DEFAULT_NM_HEALTH_CHECK_RUN_BEFORE_STARTUP); + // Determine time out String scriptTimeoutConfig = String.format( YarnConfiguration.NM_HEALTH_CHECK_SCRIPT_TIMEOUT_MS_TEMPLATE, @@ -113,7 +118,7 @@ public class NodeHealthScriptRunner extends TimedHealthReporterService { String[] scriptArgs = conf.getStrings(scriptArgsConfig, new String[]{}); return new NodeHealthScriptRunner(nodeHealthScript, - checkIntervalMs, scriptTimeout, scriptArgs); + checkIntervalMs, scriptTimeout, scriptArgs, runBeforeStartup); } private enum HealthCheckerExitStatus { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/health/TimedHealthReporterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/health/TimedHealthReporterService.java index a0c4d8b8eb..6a7a2911d2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/health/TimedHealthReporterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/health/TimedHealthReporterService.java @@ -45,6 +45,7 @@ public abstract class TimedHealthReporterService extends AbstractService private Timer timer; private TimerTask task; private long intervalMs; + private boolean runBeforeStartup; TimedHealthReporterService(String name, long intervalMs) { super(name); @@ -52,6 +53,17 @@ public abstract class TimedHealthReporterService extends AbstractService this.healthReport = ""; this.lastReportedTime = System.currentTimeMillis(); this.intervalMs = intervalMs; + this.runBeforeStartup = false; + } + + TimedHealthReporterService(String name, long intervalMs, + boolean runBeforeStartup) { + super(name); + this.isHealthy = true; + this.healthReport = ""; + this.lastReportedTime = System.currentTimeMillis(); + this.intervalMs = intervalMs; + this.runBeforeStartup = runBeforeStartup; } @VisibleForTesting @@ -73,7 +85,13 @@ public abstract class TimedHealthReporterService extends AbstractService throw new Exception("Health reporting task hasn't been set!"); } timer = new Timer("HealthReporterService-Timer", true); - timer.scheduleAtFixedRate(task, 0, intervalMs); + long delay = 0; + if (runBeforeStartup) { + delay = intervalMs; + task.run(); + } + + timer.scheduleAtFixedRate(task, delay, intervalMs); super.serviceStart(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java index b1fc2f1aa2..3f4879b23e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.nodemanager; +import static org.mockito.Mockito.mock; + import java.io.File; import java.io.IOException; import java.util.ArrayList; @@ -134,6 +136,9 @@ public class TestEventFlow { new DummyContainerManager(context, exec, del, nodeStatusUpdater, metrics, dirsHandler); nodeStatusUpdater.init(conf); + NodeResourceMonitorImpl nodeResourceMonitor = mock( + NodeResourceMonitorImpl.class); + ((NMContext) context).setNodeResourceMonitor(nodeResourceMonitor); ((NMContext)context).setContainerManager(containerManager); nodeStatusUpdater.start(); ((NMContext)context).setNodeStatusUpdater(nodeStatusUpdater); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java index 7a85bfab44..9ee3ce6bc8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java @@ -26,6 +26,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.doNothing; +import org.apache.hadoop.yarn.server.nodemanager.NodeResourceMonitorImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -156,33 +157,21 @@ public abstract class BaseContainerManagerTest { protected NodeHealthCheckerService nodeHealthChecker; protected LocalDirsHandlerService dirsHandler; protected final long DUMMY_RM_IDENTIFIER = 1234; - - protected NodeStatusUpdater nodeStatusUpdater = new NodeStatusUpdaterImpl( - context, new AsyncDispatcher(), null, metrics) { - @Override - protected ResourceTracker getRMClient() { - return new LocalRMInterface(); - }; - - @Override - protected void stopRMProxy() { - return; - } - - @Override - protected void startStatusUpdater() { - return; // Don't start any updating thread. - } - - @Override - public long getRMIdentifier() { - // There is no real RM registration, simulate and set RMIdentifier - return DUMMY_RM_IDENTIFIER; - } - }; - + private NodeResourceMonitorImpl nodeResourceMonitor = mock( + NodeResourceMonitorImpl.class); + private NodeHealthCheckerService nodeHealthCheckerService; + private NodeStatusUpdater nodeStatusUpdater; protected ContainerManagerImpl containerManager = null; + public NodeStatusUpdater getNodeStatusUpdater() { + return nodeStatusUpdater; + } + + public void setNodeStatusUpdater( + NodeStatusUpdater nodeStatusUpdater) { + this.nodeStatusUpdater = nodeStatusUpdater; + } + protected ContainerExecutor createContainerExecutor() { DefaultContainerExecutor exec = new DefaultContainerExecutor(); exec.setConf(conf); @@ -218,11 +207,36 @@ public abstract class BaseContainerManagerTest { delSrvc.init(conf); dirsHandler = new LocalDirsHandlerService(); - nodeHealthChecker = new NodeHealthCheckerService(dirsHandler); - nodeHealthChecker.init(conf); + dirsHandler.init(conf); + nodeHealthCheckerService = new NodeHealthCheckerService(dirsHandler); + nodeStatusUpdater = new NodeStatusUpdaterImpl( + context, new AsyncDispatcher(), nodeHealthCheckerService, metrics) { + @Override + protected ResourceTracker getRMClient() { + return new LocalRMInterface(); + }; + + @Override + protected void stopRMProxy() { + return; + } + + @Override + protected void startStatusUpdater() { + return; // Don't start any updating thread. + } + + @Override + public long getRMIdentifier() { + // There is no real RM registration, simulate and set RMIdentifier + return DUMMY_RM_IDENTIFIER; + } + }; + containerManager = createContainerManager(delSrvc); ((NMContext)context).setContainerManager(containerManager); ((NMContext)context).setContainerExecutor(exec); + ((NMContext)context).setNodeResourceMonitor(nodeResourceMonitor); nodeStatusUpdater.init(conf); containerManager.init(conf); nodeStatusUpdater.start(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java index e215980882..4e63417bba 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java @@ -193,8 +193,8 @@ public class TestContainerManager extends BaseContainerManagerTest { @Override protected ContainerManagerImpl createContainerManager(DeletionService delSrvc) { - return new ContainerManagerImpl(context, exec, delSrvc, nodeStatusUpdater, - metrics, dirsHandler) { + return new ContainerManagerImpl(context, exec, delSrvc, + getNodeStatusUpdater(), metrics, dirsHandler) { @Override protected UserGroupInformation getRemoteUgi() throws YarnException { @@ -1704,7 +1704,7 @@ public class TestContainerManager extends BaseContainerManagerTest { @Test public void testNullTokens() throws Exception { ContainerManagerImpl cMgrImpl = - new ContainerManagerImpl(context, exec, delSrvc, nodeStatusUpdater, + new ContainerManagerImpl(context, exec, delSrvc, getNodeStatusUpdater(), metrics, dirsHandler); String strExceptionMsg = ""; try { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestNMProxy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestNMProxy.java index 5f023f02df..32ff5724c3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestNMProxy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestNMProxy.java @@ -65,8 +65,8 @@ public class TestNMProxy extends BaseContainerManagerTest { @Override protected ContainerManagerImpl createContainerManager(DeletionService delSrvc) { - return new ContainerManagerImpl(context, exec, delSrvc, nodeStatusUpdater, - metrics, dirsHandler) { + return new ContainerManagerImpl(context, exec, delSrvc, + getNodeStatusUpdater(), metrics, dirsHandler) { @Override public StartContainersResponse startContainers( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java index b21850cbcf..508b8bd091 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java @@ -131,7 +131,7 @@ public class TestContainerSchedulerQueuing extends BaseContainerManagerTest { protected ContainerManagerImpl createContainerManager( DeletionService delSrvc) { return new ContainerManagerImpl(context, exec, delSrvc, - nodeStatusUpdater, metrics, dirsHandler) { + getNodeStatusUpdater(), metrics, dirsHandler) { @Override protected UserGroupInformation getRemoteUgi() throws YarnException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 2c89ddd9e9..7d6feea6f1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -335,6 +335,7 @@ public class ResourceTrackerService extends AbstractService implements Resource capability = request.getResource(); String nodeManagerVersion = request.getNMVersion(); Resource physicalResource = request.getPhysicalResource(); + NodeStatus nodeStatus = request.getNodeStatus(); RegisterNodeManagerResponse response = recordFactory .newRecordInstance(RegisterNodeManagerResponse.class); @@ -426,7 +427,7 @@ public class ResourceTrackerService extends AbstractService implements if (oldNode == null) { RMNodeStartedEvent startEvent = new RMNodeStartedEvent(nodeId, request.getNMContainerStatuses(), - request.getRunningApplications()); + request.getRunningApplications(), nodeStatus); if (request.getLogAggregationReportsForApps() != null && !request.getLogAggregationReportsForApps().isEmpty()) { if (LOG.isDebugEnabled()) { @@ -462,7 +463,7 @@ public class ResourceTrackerService extends AbstractService implements this.rmContext.getRMNodes().put(nodeId, rmNode); this.rmContext.getDispatcher().getEventHandler() - .handle(new RMNodeStartedEvent(nodeId, null, null)); + .handle(new RMNodeStartedEvent(nodeId, null, null, nodeStatus)); } else { // Reset heartbeat ID since node just restarted. oldNode.resetLastNodeHeartBeatResponse(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index a565fe7565..68f44dc6d5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -36,6 +36,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import org.apache.commons.collections.keyvalue.DefaultMapEntry; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -208,7 +209,8 @@ public class RMNodeImpl implements RMNode, EventHandler { RMNodeEventType, RMNodeEvent>(NodeState.NEW) //Transitions from NEW state - .addTransition(NodeState.NEW, NodeState.RUNNING, + .addTransition(NodeState.NEW, + EnumSet.of(NodeState.RUNNING, NodeState.UNHEALTHY), RMNodeEventType.STARTED, new AddNodeTransition()) .addTransition(NodeState.NEW, NodeState.NEW, RMNodeEventType.RESOURCE_UPDATE, @@ -707,7 +709,6 @@ public class RMNodeImpl implements RMNode, EventHandler { private void updateMetricsForRejoinedNode(NodeState previousNodeState) { ClusterMetrics metrics = ClusterMetrics.getMetrics(); - metrics.incrNumActiveNodes(); switch (previousNodeState) { case LOST: @@ -850,10 +851,10 @@ public class RMNodeImpl implements RMNode, EventHandler { } public static class AddNodeTransition implements - SingleArcTransition { + MultipleArcTransition { @Override - public void transition(RMNodeImpl rmNode, RMNodeEvent event) { + public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) { // Inform the scheduler RMNodeStartedEvent startEvent = (RMNodeStartedEvent) event; List containers = null; @@ -871,8 +872,6 @@ public class RMNodeImpl implements RMNode, EventHandler { if (previousRMNode != null) { ClusterMetrics.getMetrics().decrDecommisionedNMs(); } - // Increment activeNodes explicitly because this is a new node. - ClusterMetrics.getMetrics().incrNumActiveNodes(); containers = startEvent.getNMContainerStatuses(); if (containers != null && !containers.isEmpty()) { for (NMContainerStatus container : containers) { @@ -889,17 +888,37 @@ public class RMNodeImpl implements RMNode, EventHandler { } } - rmNode.context.getDispatcher().getEventHandler() - .handle(new NodeAddedSchedulerEvent(rmNode, containers)); - rmNode.context.getDispatcher().getEventHandler().handle( - new NodesListManagerEvent( - NodesListManagerEventType.NODE_USABLE, rmNode)); + NodeState nodeState; + NodeStatus nodeStatus = + startEvent.getNodeStatus(); + + if (nodeStatus == null) { + nodeState = NodeState.RUNNING; + reportNodeRunning(rmNode, containers); + } else { + RMNodeStatusEvent rmNodeStatusEvent = + new RMNodeStatusEvent(nodeId, nodeStatus); + + NodeHealthStatus nodeHealthStatus = + updateRMNodeFromStatusEvents(rmNode, rmNodeStatusEvent); + + if (nodeHealthStatus.getIsNodeHealthy()) { + nodeState = NodeState.RUNNING; + reportNodeRunning(rmNode, containers); + } else { + nodeState = NodeState.UNHEALTHY; + reportNodeUnusable(rmNode, nodeState); + } + } + List logAggregationReportsForApps = startEvent.getLogAggregationReportsForApps(); if (logAggregationReportsForApps != null && !logAggregationReportsForApps.isEmpty()) { rmNode.handleLogAggregationStatus(logAggregationReportsForApps); } + + return nodeState; } } @@ -1110,6 +1129,22 @@ public class RMNodeImpl implements RMNode, EventHandler { } } + /** + * Report node is RUNNING. + * @param rmNode + * @param containers + */ + public static void reportNodeRunning(RMNodeImpl rmNode, + List containers) { + rmNode.context.getDispatcher().getEventHandler() + .handle(new NodeAddedSchedulerEvent(rmNode, containers)); + rmNode.context.getDispatcher().getEventHandler().handle( + new NodesListManagerEvent( + NodesListManagerEventType.NODE_USABLE, rmNode)); + // Increment activeNodes explicitly because this is a new node. + ClusterMetrics.getMetrics().incrNumActiveNodes(); + } + /** * Report node is UNUSABLE and update metrics. * @param rmNode @@ -1301,6 +1336,7 @@ public class RMNodeImpl implements RMNode, EventHandler { // notifiers get update metadata because they will very likely query it // upon notification // Update metrics + ClusterMetrics.getMetrics().incrNumActiveNodes(); rmNode.updateMetricsForRejoinedNode(NodeState.UNHEALTHY); return NodeState.RUNNING; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartedEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartedEvent.java index 397699453f..2bf04d0fe7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartedEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartedEvent.java @@ -24,19 +24,23 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; public class RMNodeStartedEvent extends RMNodeEvent { + private final NodeStatus nodeStatus; private List containerStatuses; private List runningApplications; private List logAggregationReportsForApps; public RMNodeStartedEvent(NodeId nodeId, List containerReports, - List runningApplications) { + List runningApplications, + NodeStatus nodeStatus) { super(nodeId, RMNodeEventType.STARTED); this.containerStatuses = containerReports; this.runningApplications = runningApplications; + this.nodeStatus = nodeStatus; } public List getNMContainerStatuses() { @@ -47,6 +51,10 @@ public class RMNodeStartedEvent extends RMNodeEvent { return runningApplications; } + public NodeStatus getNodeStatus() { + return nodeStatus; + } + public List getLogAggregationReportsForApps() { return this.logAggregationReportsForApps; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java index 3543bc4707..d433753701 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java @@ -18,6 +18,9 @@ package org.apache.hadoop.yarn.server.resourcemanager; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -187,6 +190,17 @@ public class MockNM { req.setNodeLabels(nodeLabels); } + NodeStatus status = Records.newRecord(NodeStatus.class); + status.setResponseId(0); + status.setNodeId(nodeId); + status.setContainersStatuses(new ArrayList<>(containerStats.values())); + NodeHealthStatus healthStatus = Records.newRecord(NodeHealthStatus.class); + healthStatus.setHealthReport(""); + healthStatus.setIsNodeHealthy(true); + healthStatus.setLastHealthReportTime(1); + status.setNodeHealthStatus(healthStatus); + req.setNodeStatus(status); + RegisterNodeManagerResponse registrationResponse = resourceTracker.registerNodeManager(req); this.currentContainerTokenMasterKey = @@ -364,6 +378,14 @@ public class MockNM { return heartbeatResponse; } + public static NodeStatus createMockNodeStatus() { + NodeStatus mockNodeStatus = mock(NodeStatus.class); + NodeHealthStatus mockNodeHealthStatus = mock(NodeHealthStatus.class); + when(mockNodeStatus.getNodeHealthStatus()).thenReturn(mockNodeHealthStatus); + when(mockNodeHealthStatus.getIsNodeHealthy()).thenReturn(true); + return mockNodeStatus; + } + public long getMemory() { return capability.getMemorySize(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index b3888c3cd6..90c554361c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.resourcemanager; +import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.security.token.Token; @@ -54,6 +56,7 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; @@ -543,7 +546,9 @@ public class MockRM extends ResourceManager { public void sendNodeStarted(MockNM nm) throws Exception { RMNodeImpl node = (RMNodeImpl) getRMContext().getRMNodes().get( nm.getNodeId()); - node.handle(new RMNodeStartedEvent(nm.getNodeId(), null, null)); + NodeStatus mockNodeStatus = createMockNodeStatus(); + node.handle(new RMNodeStartedEvent(nm.getNodeId(), null, null, + mockNodeStatus)); drainEventsImplicitly(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java index 1e4b050816..06c4527e5b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java @@ -98,7 +98,7 @@ public class NodeManager implements ContainerManagementProtocol { public NodeManager(String hostName, int containerManagerPort, int httpPort, String rackName, Resource capability, - ResourceManager resourceManager) + ResourceManager resourceManager, NodeStatus nodestatus) throws IOException, YarnException { this.containerManagerAddress = hostName + ":" + containerManagerPort; this.nodeHttpAddress = hostName + ":" + httpPort; @@ -113,6 +113,7 @@ public class NodeManager implements ContainerManagementProtocol { request.setResource(capability); request.setNodeId(this.nodeId); request.setNMVersion(YarnVersionInfo.getVersion()); + request.setNodeStatus(nodestatus); resourceTrackerService.registerNodeManager(request); this.resourceManager = resourceManager; resourceManager.getResourceScheduler().getNodeReport(this.nodeId); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java index 1f1e164cf5..c907cb778e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.yarn.server.resourcemanager; +import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus; import static org.junit.Assert.assertEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; @@ -216,8 +217,9 @@ public class TestRMNodeTransitions { @Test (timeout = 5000) public void testExpiredContainer() { + NodeStatus mockNodeStatus = createMockNodeStatus(); // Start the node - node.handle(new RMNodeStartedEvent(null, null, null)); + node.handle(new RMNodeStartedEvent(null, null, null, mockNodeStatus)); verify(scheduler).handle(any(NodeAddedSchedulerEvent.class)); // Expire a container @@ -280,12 +282,13 @@ public class TestRMNodeTransitions { @Test (timeout = 5000) public void testContainerUpdate() throws InterruptedException{ + NodeStatus mockNodeStatus = createMockNodeStatus(); //Start the node - node.handle(new RMNodeStartedEvent(null, null, null)); + node.handle(new RMNodeStartedEvent(null, null, null, mockNodeStatus)); NodeId nodeId = BuilderUtils.newNodeId("localhost:1", 1); RMNodeImpl node2 = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null); - node2.handle(new RMNodeStartedEvent(null, null, null)); + node2.handle(new RMNodeStartedEvent(null, null, null, mockNodeStatus)); ApplicationId app0 = BuilderUtils.newApplicationId(0, 0); ApplicationId app1 = BuilderUtils.newApplicationId(1, 1); @@ -341,8 +344,9 @@ public class TestRMNodeTransitions { @Test (timeout = 5000) public void testStatusChange(){ + NodeStatus mockNodeStatus = createMockNodeStatus(); //Start the node - node.handle(new RMNodeStartedEvent(null, null, null)); + node.handle(new RMNodeStartedEvent(null, null, null, mockNodeStatus)); //Add info to the queue first node.setNextHeartBeat(false); @@ -607,6 +611,33 @@ public class TestRMNodeTransitions { Assert.assertEquals(NodeState.REBOOTED, node.getState()); } + @Test + public void testAddUnhealthyNode() { + ClusterMetrics cm = ClusterMetrics.getMetrics(); + int initialUnhealthy = cm.getUnhealthyNMs(); + int initialActive = cm.getNumActiveNMs(); + int initialLost = cm.getNumLostNMs(); + int initialDecommissioned = cm.getNumDecommisionedNMs(); + int initialRebooted = cm.getNumRebootedNMs(); + + NodeHealthStatus status = NodeHealthStatus.newInstance(false, "sick", + System.currentTimeMillis()); + NodeStatus nodeStatus = NodeStatus.newInstance(node.getNodeID(), 0, + new ArrayList<>(), null, status, null, null, null); + node.handle(new RMNodeStartedEvent(node.getNodeID(), null, null, + nodeStatus)); + + Assert.assertEquals("Unhealthy Nodes", + initialUnhealthy + 1, cm.getUnhealthyNMs()); + Assert.assertEquals("Active Nodes", initialActive, cm.getNumActiveNMs()); + Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs()); + Assert.assertEquals("Decommissioned Nodes", + initialDecommissioned, cm.getNumDecommisionedNMs()); + Assert.assertEquals("Rebooted Nodes", + initialRebooted, cm.getNumRebootedNMs()); + Assert.assertEquals(NodeState.UNHEALTHY, node.getState()); + } + @Test public void testNMShutdown() { RMNodeImpl node = getRunningNode(); @@ -712,7 +743,9 @@ public class TestRMNodeTransitions { Resource capability = Resource.newInstance(4096, 4); RMNodeImpl node = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, capability, nmVersion); - node.handle(new RMNodeStartedEvent(node.getNodeID(), null, null)); + NodeStatus mockNodeStatus = createMockNodeStatus(); + node.handle(new RMNodeStartedEvent(node.getNodeID(), null, null, + mockNodeStatus)); Assert.assertEquals(NodeState.RUNNING, node.getState()); return node; } @@ -763,7 +796,10 @@ public class TestRMNodeTransitions { Resource capability = Resource.newInstance(4096, 4); RMNodeImpl node = new RMNodeImpl(nodeId, rmContext,null, 0, 0, null, capability, null); - node.handle(new RMNodeStartedEvent(node.getNodeID(), null, null)); + NodeStatus mockNodeStatus = createMockNodeStatus(); + + node.handle(new RMNodeStartedEvent(node.getNodeID(), null, null, + mockNodeStatus)); Assert.assertEquals(NodeState.RUNNING, node.getState()); node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.REBOOTING)); Assert.assertEquals(NodeState.REBOOTED, node.getState()); @@ -779,7 +815,9 @@ public class TestRMNodeTransitions { int initialUnhealthy = cm.getUnhealthyNMs(); int initialDecommissioned = cm.getNumDecommisionedNMs(); int initialRebooted = cm.getNumRebootedNMs(); - node.handle(new RMNodeStartedEvent(node.getNodeID(), null, null)); + NodeStatus mockNodeStatus = createMockNodeStatus(); + node.handle(new RMNodeStartedEvent(node.getNodeID(), null, null, + mockNodeStatus)); Assert.assertEquals("Active Nodes", initialActive + 1, cm.getNumActiveNMs()); Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs()); Assert.assertEquals("Unhealthy Nodes", @@ -1075,8 +1113,9 @@ public class TestRMNodeTransitions { @Test public void testForHandlingDuplicatedCompltedContainers() { + NodeStatus mockNodeStatus = createMockNodeStatus(); // Start the node - node.handle(new RMNodeStartedEvent(null, null, null)); + node.handle(new RMNodeStartedEvent(null, null, null, mockNodeStatus)); // Add info to the queue first node.setNextHeartBeat(false); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java index 411b848217..1cb5e1d0e7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager; +import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.fail; @@ -37,6 +38,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; @@ -88,12 +90,12 @@ public class TestResourceManager { private org.apache.hadoop.yarn.server.resourcemanager.NodeManager registerNode(String hostName, int containerManagerPort, int httpPort, - String rackName, Resource capability) throws IOException, - YarnException { + String rackName, Resource capability, NodeStatus nodeStatus) + throws IOException, YarnException { org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm = new org.apache.hadoop.yarn.server.resourcemanager.NodeManager( hostName, containerManagerPort, httpPort, rackName, capability, - resourceManager); + resourceManager, nodeStatus); NodeAddedSchedulerEvent nodeAddEvent1 = new NodeAddedSchedulerEvent(resourceManager.getRMContext() .getRMNodes().get(nm.getNodeId())); @@ -109,26 +111,30 @@ public class TestResourceManager { final int memory = 4 * 1024; final int vcores = 4; - + + NodeStatus mockNodeStatus = createMockNodeStatus(); + // Register node1 String host1 = "host1"; org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm1 = registerNode(host1, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(memory, vcores)); + Resources.createResource(memory, vcores), mockNodeStatus); // Register node2 String host2 = "host2"; org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm2 = registerNode(host2, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(memory/2, vcores/2)); + Resources.createResource(memory/2, vcores/2), mockNodeStatus); // nodes should be in RUNNING state RMNodeImpl node1 = (RMNodeImpl) resourceManager.getRMContext().getRMNodes().get( nm1.getNodeId()); RMNodeImpl node2 = (RMNodeImpl) resourceManager.getRMContext().getRMNodes().get( nm2.getNodeId()); - node1.handle(new RMNodeStartedEvent(nm1.getNodeId(), null, null)); - node2.handle(new RMNodeStartedEvent(nm2.getNodeId(), null, null)); + node1.handle(new RMNodeStartedEvent(nm1.getNodeId(), null, null, + mockNodeStatus)); + node2.handle(new RMNodeStartedEvent(nm2.getNodeId(), null, null, + mockNodeStatus)); // Submit an application Application application = new Application("user1", resourceManager); @@ -216,9 +222,12 @@ public class TestResourceManager { public void testNodeHealthReportIsNotNull() throws Exception{ String host1 = "host1"; final int memory = 4 * 1024; + + NodeStatus mockNodeStatus = createMockNodeStatus(); + org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm1 = - registerNode(host1, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(memory, 1)); + registerNode(host1, 1234, 2345, NetworkTopology.DEFAULT_RACK, + Resources.createResource(memory, 1), mockNodeStatus); nm1.heartbeat(); nm1.heartbeat(); Collection values = resourceManager.getRMContext().getRMNodes().values(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index 6690339d89..066e394561 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -31,6 +31,8 @@ import org.apache.hadoop.yarn.server.api.ServerRMProxy; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.NodeEventDispatcher; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.FileSystemNodeAttributeStore; + +import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus; import static org.junit.Assert.assertEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; @@ -2712,10 +2714,14 @@ public class TestResourceTrackerService extends NodeLabelTestBase { RegisterNodeManagerRequest.class); NodeId nodeId = NodeId.newInstance("host2", 1234); Resource capability = BuilderUtils.newResource(1024, 1); + + NodeStatus mockNodeStatus = createMockNodeStatus(); + req.setResource(capability); req.setNodeId(nodeId); req.setHttpPort(1234); req.setNMVersion(YarnVersionInfo.getVersion()); + req.setNodeStatus(mockNodeStatus); ContainerId c1 = ContainerId.newContainerId(appAttemptId, 1); ContainerId c2 = ContainerId.newContainerId(appAttemptId, 2); ContainerId c3 = ContainerId.newContainerId(appAttemptId, 3); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java index 8d31fe1a8b..6836288ed1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.logaggregationstatus; +import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; @@ -139,13 +140,15 @@ public class TestRMAppLogAggregationStatus { Resource capability = Resource.newInstance(4096, 4); RMNodeImpl node1 = new RMNodeImpl(nodeId1, rmContext, null, 0, 0, null, capability, null); - node1.handle(new RMNodeStartedEvent(nodeId1, null, null)); + NodeStatus mockNodeStatus = createMockNodeStatus(); + node1.handle(new RMNodeStartedEvent(nodeId1, null, null, mockNodeStatus)); rmApp.handle(new RMAppRunningOnNodeEvent(this.appId, nodeId1)); NodeId nodeId2 = NodeId.newInstance("localhost", 2345); RMNodeImpl node2 = new RMNodeImpl(nodeId2, rmContext, null, 0, 0, null, capability, null); - node2.handle(new RMNodeStartedEvent(node2.getNodeID(), null, null)); + node2.handle(new RMNodeStartedEvent(node2.getNodeID(), null, null, + mockNodeStatus)); rmApp.handle(new RMAppRunningOnNodeEvent(this.appId, nodeId2)); // The initial log aggregation status for these two nodes diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java index f69faf4ea5..017a1e021d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java @@ -18,6 +18,9 @@ package org.apache.hadoop.yarn.server.resourcemanager.resourcetracker; +import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus; + +import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.junit.Assert; import org.slf4j.Logger; @@ -135,12 +138,15 @@ public class TestNMExpiry { String hostname3 = "localhost3"; Resource capability = BuilderUtils.newResource(1024, 1); + NodeStatus mockNodeStatus = createMockNodeStatus(); + RegisterNodeManagerRequest request1 = recordFactory .newRecordInstance(RegisterNodeManagerRequest.class); NodeId nodeId1 = NodeId.newInstance(hostname1, 0); request1.setNodeId(nodeId1); request1.setHttpPort(0); request1.setResource(capability); + request1.setNodeStatus(mockNodeStatus); resourceTrackerService.registerNodeManager(request1); RegisterNodeManagerRequest request2 = recordFactory @@ -149,6 +155,7 @@ public class TestNMExpiry { request2.setNodeId(nodeId2); request2.setHttpPort(0); request2.setResource(capability); + request2.setNodeStatus(mockNodeStatus); resourceTrackerService.registerNodeManager(request2); int waitCount = 0; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java index 3c4e6b424d..817fb9dfc3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.resourcetracker; +import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus; + import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -36,6 +38,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; import org.apache.hadoop.yarn.server.api.records.NodeAction; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.ParameterizedSchedulerTestBase; @@ -178,9 +181,13 @@ public class TestNMReconnect extends ParameterizedSchedulerTestBase { RegisterNodeManagerRequest request1 = recordFactory .newRecordInstance(RegisterNodeManagerRequest.class); NodeId nodeId1 = NodeId.newInstance(hostname1, 0); + + NodeStatus mockNodeStatus = createMockNodeStatus(); + request1.setNodeId(nodeId1); request1.setHttpPort(0); request1.setResource(capability); + request1.setNodeStatus(mockNodeStatus); resourceTrackerService.registerNodeManager(request1); Assert.assertNotNull(context.getRMNodes().get(nodeId1)); // verify Scheduler and RMContext use same RMNode reference. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java index e67deb5245..2860335d8a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; +import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus; import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -49,6 +50,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; @@ -1051,9 +1053,12 @@ public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase { RegisterNodeManagerRequest request1 = recordFactory.newRecordInstance(RegisterNodeManagerRequest.class); NodeId nodeId1 = NodeId.newInstance(hostname1, 0); + NodeStatus mockNodeStatus = createMockNodeStatus(); + request1.setNodeId(nodeId1); request1.setHttpPort(0); request1.setResource(capability); + request1.setNodeStatus(mockNodeStatus); privateResourceTrackerService.registerNodeManager(request1); privateDispatcher.await(); Resource clusterResource = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerHealth.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerHealth.java index 83a354de5a..a75be7745f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerHealth.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerHealth.java @@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.resourcemanager.NodeManager; import org.apache.hadoop.yarn.server.resourcemanager.Application; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; @@ -43,6 +44,7 @@ import org.junit.Test; import java.io.IOException; +import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus; import static org.junit.Assume.assumeTrue; public class TestSchedulerHealth { @@ -170,11 +172,11 @@ public class TestSchedulerHealth { } private NodeManager registerNode(String hostName, int containerManagerPort, - int httpPort, String rackName, Resource capability) throws IOException, - YarnException { + int httpPort, String rackName, Resource capability, NodeStatus nodeStatus) + throws IOException, YarnException { NodeManager nm = new NodeManager(hostName, containerManagerPort, httpPort, rackName, - capability, resourceManager); + capability, resourceManager, nodeStatus); NodeAddedSchedulerEvent nodeAddEvent1 = new NodeAddedSchedulerEvent(resourceManager.getRMContext().getRMNodes() .get(nm.getNodeId())); @@ -200,11 +202,13 @@ public class TestSchedulerHealth { assumeTrue("This test is only supported on Capacity Scheduler", isCapacityScheduler); + NodeStatus mockNodeStatus = createMockNodeStatus(); + // Register node1 String host_0 = "host_0"; NodeManager nm_0 = registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(5 * 1024, 1)); + Resources.createResource(5 * 1024, 1), mockNodeStatus); // ResourceRequest priorities Priority priority_0 = Priority.newInstance(0); @@ -275,15 +279,17 @@ public class TestSchedulerHealth { assumeTrue("This test is only supported on Capacity Scheduler", isCapacityScheduler); + NodeStatus mockNodeStatus = createMockNodeStatus(); + // Register nodes String host_0 = "host_0"; NodeManager nm_0 = registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(2 * 1024, 1)); + Resources.createResource(2 * 1024, 1), mockNodeStatus); String host_1 = "host_1"; NodeManager nm_1 = registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(5 * 1024, 1)); + Resources.createResource(5 * 1024, 1), mockNodeStatus); nodeUpdate(nm_0); nodeUpdate(nm_1); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index a746f06f27..1fe7a53107 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; +import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus; import static org.assertj.core.api.Assertions.assertThat; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION_MB; @@ -54,6 +55,7 @@ import java.util.concurrent.CyclicBarrier; import com.google.common.collect.Sets; import org.apache.hadoop.service.ServiceStateException; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -242,9 +244,10 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase { private NodeManager registerNode(ResourceManager rm, String hostName, int containerManagerPort, int httpPort, String rackName, - Resource capability) throws IOException, YarnException { + Resource capability, NodeStatus nodeStatus) + throws IOException, YarnException { NodeManager nm = new NodeManager(hostName, - containerManagerPort, httpPort, rackName, capability, rm); + containerManagerPort, httpPort, rackName, capability, rm, nodeStatus); NodeAddedSchedulerEvent nodeAddEvent1 = new NodeAddedSchedulerEvent(rm.getRMContext().getRMNodes() .get(nm.getNodeId())); @@ -286,11 +289,11 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase { } private NodeManager registerNode(String hostName, int containerManagerPort, - int httpPort, String rackName, - Resource capability) - throws IOException, YarnException { + int httpPort, String rackName, + Resource capability, NodeStatus nodeStatus) + throws IOException, YarnException { NodeManager nm = new NodeManager(hostName, containerManagerPort, httpPort, - rackName, capability, resourceManager); + rackName, capability, resourceManager, nodeStatus); NodeAddedSchedulerEvent nodeAddEvent1 = new NodeAddedSchedulerEvent(resourceManager.getRMContext() .getRMNodes().get(nm.getNodeId())); @@ -303,17 +306,19 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase { LOG.info("--- START: testCapacityScheduler ---"); + NodeStatus mockNodeStatus = createMockNodeStatus(); + // Register node1 String host_0 = "host_0"; NodeManager nm_0 = registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(4 * GB, 1)); + Resources.createResource(4 * GB, 1), mockNodeStatus); // Register node2 String host_1 = "host_1"; NodeManager nm_1 = registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(2 * GB, 1)); + Resources.createResource(2 * GB, 1), mockNodeStatus); // ResourceRequest priorities Priority priority_0 = Priority.newInstance(0); @@ -443,11 +448,13 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase { when(mC.getConfigurationProvider()).thenReturn( new LocalConfigurationProvider()); + NodeStatus mockNodeStatus = createMockNodeStatus(); + // Register node1 String host0 = "host_0"; NodeManager nm0 = registerNode(rm, host0, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(10 * GB, 10)); + Resources.createResource(10 * GB, 10), mockNodeStatus); // ResourceRequest priorities Priority priority0 = Priority.newInstance(0); @@ -545,11 +552,13 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase { when(mC.getConfigurationProvider()).thenReturn( new LocalConfigurationProvider()); + NodeStatus mockNodeStatus = createMockNodeStatus(); + // Register node1 String host0 = "host_0"; NodeManager nm0 = registerNode(rm, host0, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(10 * GB, 10)); + Resources.createResource(10 * GB, 10), mockNodeStatus); // ResourceRequest priorities Priority priority0 = Priority.newInstance(0); @@ -2097,17 +2106,20 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase { public void testMoveAppForMoveToQueueWithFreeCap() throws Exception { ResourceScheduler scheduler = resourceManager.getResourceScheduler(); + + NodeStatus mockNodeStatus = createMockNodeStatus(); + // Register node1 String host_0 = "host_0"; NodeManager nm_0 = registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(4 * GB, 1)); + Resources.createResource(4 * GB, 1), mockNodeStatus); // Register node2 String host_1 = "host_1"; NodeManager nm_1 = registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(2 * GB, 1)); + Resources.createResource(2 * GB, 1), mockNodeStatus); // ResourceRequest priorities Priority priority_0 = Priority.newInstance(0); @@ -2213,17 +2225,19 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase { ResourceScheduler scheduler = resourceManager.getResourceScheduler(); + NodeStatus mockNodeStatus = createMockNodeStatus(); + // Register node1 String host_0 = "host_0"; NodeManager nm_0 = registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(5 * GB, 1)); + Resources.createResource(5 * GB, 1), mockNodeStatus); // Register node2 String host_1 = "host_1"; NodeManager nm_1 = registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(5 * GB, 1)); + Resources.createResource(5 * GB, 1), mockNodeStatus); // ResourceRequest priorities Priority priority_0 = Priority.newInstance(0); @@ -2335,11 +2349,13 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase { ResourceScheduler scheduler = resourceManager.getResourceScheduler(); + NodeStatus mockNodeStatus = createMockNodeStatus(); + // Register node1 String host_0 = "host_0"; NodeManager nm_0 = registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(6 * GB, 1)); + Resources.createResource(6 * GB, 1), mockNodeStatus); // ResourceRequest priorities Priority priority_0 = Priority.newInstance(0); @@ -2383,17 +2399,19 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase { public void testMoveAppQueueMetricsCheck() throws Exception { ResourceScheduler scheduler = resourceManager.getResourceScheduler(); + NodeStatus mockNodeStatus = createMockNodeStatus(); + // Register node1 String host_0 = "host_0"; NodeManager nm_0 = registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(5 * GB, 1)); + Resources.createResource(5 * GB, 1), mockNodeStatus); // Register node2 String host_1 = "host_1"; NodeManager nm_1 = registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(5 * GB, 1)); + Resources.createResource(5 * GB, 1), mockNodeStatus); // ResourceRequest priorities Priority priority_0 = Priority.newInstance(0); @@ -4594,9 +4612,12 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase { } @Test public void testRemovedNodeDecomissioningNode() throws Exception { + NodeStatus mockNodeStatus = createMockNodeStatus(); + // Register nodemanager NodeManager nm = registerNode("host_decom", 1234, 2345, - NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4)); + NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4), + mockNodeStatus); RMNode node = resourceManager.getRMContext().getRMNodes().get(nm.getNodeId()); @@ -4639,10 +4660,14 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase { ((CapacityScheduler) resourceManager.getResourceScheduler()) .setRMContext(spyContext); ((AsyncDispatcher) mockDispatcher).start(); + + NodeStatus mockNodeStatus = createMockNodeStatus(); + // Register node String host_0 = "host_0"; NodeManager nm_0 = registerNode(host_0, 1234, 2345, - NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4)); + NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4), + mockNodeStatus); // ResourceRequest priorities Priority priority_0 = Priority.newInstance(0); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 2e043fb048..05ec09e667 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -50,6 +50,7 @@ import org.apache.hadoop.yarn.exceptions.SchedulerInvalidResoureRequestException import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.security.YarnAuthorizationProvider; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; @@ -124,6 +125,7 @@ import java.util.List; import java.util.Set; import java.util.stream.Collectors; +import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus; import static org.assertj.core.api.Assertions.assertThat; import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES; import static org.junit.Assert.assertEquals; @@ -4862,9 +4864,12 @@ public class TestFairScheduler extends FairSchedulerTestBase { @Test public void testRemovedNodeDecomissioningNode() throws Exception { + NodeStatus mockNodeStatus = createMockNodeStatus(); + // Register nodemanager NodeManager nm = registerNode("host_decom", 1234, 2345, - NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4)); + NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4), + mockNodeStatus); RMNode node = resourceManager.getRMContext().getRMNodes().get(nm.getNodeId()); @@ -4907,10 +4912,14 @@ public class TestFairScheduler extends FairSchedulerTestBase { ((FairScheduler) resourceManager.getResourceScheduler()) .setRMContext(spyContext); ((AsyncDispatcher) mockDispatcher).start(); + + NodeStatus mockNodeStatus = createMockNodeStatus(); + // Register node String host_0 = "host_0"; NodeManager nm_0 = registerNode(host_0, 1234, 2345, - NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4)); + NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4), + mockNodeStatus); RMNode node = resourceManager.getRMContext().getRMNodes().get(nm_0.getNodeId()); @@ -4949,11 +4958,13 @@ public class TestFairScheduler extends FairSchedulerTestBase { } private NodeManager registerNode(String hostName, int containerManagerPort, - int httpPort, String rackName, - Resource capability) + int httpPort, String rackName, + Resource capability, NodeStatus nodeStatus) throws IOException, YarnException { + NodeStatus mockNodeStatus = createMockNodeStatus(); + NodeManager nm = new NodeManager(hostName, containerManagerPort, httpPort, - rackName, capability, resourceManager); + rackName, capability, resourceManager, mockNodeStatus); // after YARN-5375, scheduler event is processed in rm main dispatcher, // wait it processed, or may lead dead lock diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java index 01fb6a79b4..9b3657e00d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo; +import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -33,6 +34,7 @@ import java.util.List; import java.util.Map; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.event.Level; @@ -143,10 +145,10 @@ public class TestFifoScheduler { private NodeManager registerNode(String hostName, int containerManagerPort, int nmHttpPort, String rackName, - Resource capability) + Resource capability, NodeStatus nodeStatus) throws IOException, YarnException { NodeManager nm = new NodeManager(hostName, containerManagerPort, - nmHttpPort, rackName, capability, resourceManager); + nmHttpPort, rackName, capability, resourceManager, nodeStatus); NodeAddedSchedulerEvent nodeAddEvent1 = new NodeAddedSchedulerEvent(resourceManager.getRMContext().getRMNodes() .get(nm.getNodeId())); @@ -406,19 +408,21 @@ public class TestFifoScheduler { LOG.info("--- START: testFifoScheduler ---"); final int GB = 1024; - + + NodeStatus mockNodeStatus = createMockNodeStatus(); + // Register node1 String host_0 = "host_0"; org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_0 = registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(4 * GB, 1)); + Resources.createResource(4 * GB, 1), mockNodeStatus); nm_0.heartbeat(); // Register node2 String host_1 = "host_1"; org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_1 = registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(2 * GB, 1)); + Resources.createResource(2 * GB, 1), mockNodeStatus); nm_1.heartbeat(); // ResourceRequest priorities @@ -1197,9 +1201,12 @@ public class TestFifoScheduler { @Test public void testRemovedNodeDecomissioningNode() throws Exception { + NodeStatus mockNodeStatus = createMockNodeStatus(); + // Register nodemanager NodeManager nm = registerNode("host_decom", 1234, 2345, - NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4)); + NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4), + mockNodeStatus); RMNode node = resourceManager.getRMContext().getRMNodes().get(nm.getNodeId()); @@ -1242,10 +1249,14 @@ public class TestFifoScheduler { ((FifoScheduler) resourceManager.getResourceScheduler()) .setRMContext(spyContext); ((AsyncDispatcher) mockDispatcher).start(); + + NodeStatus mockNodeStatus = createMockNodeStatus(); + // Register node String host_0 = "host_0"; NodeManager nm_0 = registerNode(host_0, 1234, 2345, - NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4)); + NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4), + mockNodeStatus); // ResourceRequest priorities Priority priority_0 = Priority.newInstance(0); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java index c3f41f62f6..dc028fe743 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.webapp; +import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus; import static org.assertj.core.api.Assertions.assertThat; import static org.apache.hadoop.yarn.webapp.WebServicesTestUtils.assertResponseStatusCode; import static org.junit.Assert.assertEquals; @@ -241,8 +242,10 @@ public class TestRMWebServicesNodes extends JerseyTestBase { } private void sendStartedEvent(RMNode node) { + NodeStatus mockNodeStatus = createMockNodeStatus(); ((RMNodeImpl) node) - .handle(new RMNodeStartedEvent(node.getNodeID(), null, null)); + .handle(new RMNodeStartedEvent(node.getNodeID(), null, null, + mockNodeStatus)); } private void sendLostEvent(RMNode node) {