YARN-1012. Report NM aggregated container resource utilization in heartbeat. (Inigo Goiri via kasha)

This commit is contained in:
Karthik Kambatla 2015-07-09 09:35:14 -07:00
parent fffb15bb43
commit 527c40e4d6
11 changed files with 387 additions and 8 deletions

View File

@ -131,6 +131,9 @@ Release 2.8.0 - UNRELEASED
YARN-41. The RM should handle the graceful shutdown of the NM. (Devaraj K via
junping_du)
YARN-1012. Report NM aggregated container resource utilization in heartbeat.
(Inigo Goiri via kasha)
IMPROVEMENTS
YARN-644. Basic null check is not performed on passed in arguments before

View File

@ -68,7 +68,7 @@ public void testResourceTrackerOnHA() throws Exception {
failoverThread = createAndStartFailoverThread();
NodeStatus status =
NodeStatus.newInstance(NodeId.newInstance("localhost", 0), 0, null,
null, null);
null, null, null);
NodeHeartbeatRequest request2 =
NodeHeartbeatRequest.newInstance(status, null, null,null);
resourceTracker.nodeHeartbeat(request2);

View File

@ -19,24 +19,48 @@
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.util.Records;
/**
* {@code NodeStatus} is a summary of the status of the node.
* <p>
* It includes information such as:
* <ul>
* <li>Node information and status..</li>
* <li>Container status.</li>
* </ul>
*/
public abstract class NodeStatus {
/**
* Create a new {@code NodeStatus}.
* @param nodeId Identifier for this node.
* @param responseId Identifier for the response.
* @param containerStatuses Status of the containers running in this node.
* @param keepAliveApplications Applications to keep alive.
* @param nodeHealthStatus Health status of the node.
* @param containersUtilizations Utilization of the containers in this node.
* @return New {@code NodeStatus} with the provided information.
*/
public static NodeStatus newInstance(NodeId nodeId, int responseId,
List<ContainerStatus> containerStatuses,
List<ApplicationId> keepAliveApplications,
NodeHealthStatus nodeHealthStatus) {
NodeHealthStatus nodeHealthStatus,
ResourceUtilization containersUtilization) {
NodeStatus nodeStatus = Records.newRecord(NodeStatus.class);
nodeStatus.setResponseId(responseId);
nodeStatus.setNodeId(nodeId);
nodeStatus.setContainersStatuses(containerStatuses);
nodeStatus.setKeepAliveApplications(keepAliveApplications);
nodeStatus.setNodeHealthStatus(nodeHealthStatus);
nodeStatus.setContainersUtilization(containersUtilization);
return nodeStatus;
}
@ -55,4 +79,17 @@ public abstract void setContainersStatuses(
public abstract void setNodeId(NodeId nodeId);
public abstract void setResponseId(int responseId);
/**
* Get the <em>resource utilization</em> of the containers.
* @return <em>resource utilization</em> of the containers
*/
@Public
@Stable
public abstract ResourceUtilization getContainersUtilization();
@Private
@Unstable
public abstract void setContainersUtilization(
ResourceUtilization containersUtilization);
}

View File

@ -0,0 +1,133 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.api.records;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.yarn.util.Records;
/**
* <p>
* <code>ResourceUtilization</code> models the utilization of a set of computer
* resources in the cluster.
* </p>
*/
@Private
@Evolving
public abstract class ResourceUtilization implements
Comparable<ResourceUtilization> {
public static ResourceUtilization newInstance(int pmem, int vmem, float cpu) {
ResourceUtilization utilization =
Records.newRecord(ResourceUtilization.class);
utilization.setPhysicalMemory(pmem);
utilization.setVirtualMemory(vmem);
utilization.setCPU(cpu);
return utilization;
}
/**
* Get used <em>virtual memory</em>.
*
* @return <em>virtual memory</em> in MB
*/
public abstract int getVirtualMemory();
/**
* Set used <em>virtual memory</em>.
*
* @param vmem <em>virtual memory</em> in MB
*/
public abstract void setVirtualMemory(int vmem);
/**
* Get <em>physical memory</em>.
*
* @return <em>physical memory</em> in MB
*/
public abstract int getPhysicalMemory();
/**
* Set <em>physical memory</em>.
*
* @param pmem <em>physical memory</em> in MB
*/
public abstract void setPhysicalMemory(int pmem);
/**
* Get <em>CPU</em> utilization.
*
* @return <em>CPU utilization</em> normalized to 1 CPU
*/
public abstract float getCPU();
/**
* Set <em>CPU</em> utilization.
*
* @param cpu <em>CPU utilization</em> normalized to 1 CPU
*/
public abstract void setCPU(float cpu);
@Override
public int hashCode() {
final int prime = 263167;
int result = 3571;
result = prime * result + getVirtualMemory();
result = prime * result + getPhysicalMemory();
result = 31 * result + Float.valueOf(getCPU()).hashCode();
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (!(obj instanceof ResourceUtilization)) {
return false;
}
ResourceUtilization other = (ResourceUtilization) obj;
if (getVirtualMemory() != other.getVirtualMemory()
|| getPhysicalMemory() != other.getPhysicalMemory()
|| getCPU() != other.getCPU()) {
return false;
}
return true;
}
@Override
public String toString() {
return "<pmem:" + getPhysicalMemory() + ", vmem:" + getVirtualMemory()
+ ", vCores:" + getCPU() + ">";
}
/**
* Add utilization to the current one.
* @param pmem Physical memory used to add.
* @param vmem Virtual memory used to add.
* @param cpu CPU utilization to add.
*/
public void addTo(int pmem, int vmem, float cpu) {
this.setPhysicalMemory(this.getPhysicalMemory() + pmem);
this.setVirtualMemory(this.getVirtualMemory() + vmem);
this.setCPU(this.getCPU() + cpu);
}
}

View File

@ -35,9 +35,10 @@
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeHealthStatusProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.ResourceUtilizationProto;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.api.records.ResourceUtilization;
public class NodeStatusPBImpl extends NodeStatus {
NodeStatusProto proto = NodeStatusProto.getDefaultInstance();
@ -291,6 +292,28 @@ public synchronized void setNodeHealthStatus(NodeHealthStatus healthStatus) {
this.nodeHealthStatus = healthStatus;
}
@Override
public ResourceUtilization getContainersUtilization() {
NodeStatusProtoOrBuilder p =
this.viaProto ? this.proto : this.builder;
if (!p.hasContainersUtilization()) {
return null;
}
return convertFromProtoFormat(p.getContainersUtilization());
}
@Override
public void setContainersUtilization(
ResourceUtilization containersUtilization) {
maybeInitBuilder();
if (containersUtilization == null) {
this.builder.clearContainersUtilization();
return;
}
this.builder
.setContainersUtilization(convertToProtoFormat(containersUtilization));
}
private NodeIdProto convertToProtoFormat(NodeId nodeId) {
return ((NodeIdPBImpl)nodeId).getProto();
}
@ -323,4 +346,13 @@ private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto c) {
private ApplicationIdProto convertToProtoFormat(ApplicationId c) {
return ((ApplicationIdPBImpl)c).getProto();
}
private ResourceUtilizationProto convertToProtoFormat(ResourceUtilization r) {
return ((ResourceUtilizationPBImpl) r).getProto();
}
private ResourceUtilizationPBImpl convertFromProtoFormat(
ResourceUtilizationProto p) {
return new ResourceUtilizationPBImpl(p);
}
}

View File

@ -0,0 +1,104 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.api.records.impl.pb;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.ResourceUtilizationProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.ResourceUtilizationProtoOrBuilder;
import org.apache.hadoop.yarn.server.api.records.ResourceUtilization;
@Private
@Unstable
public class ResourceUtilizationPBImpl extends ResourceUtilization {
private ResourceUtilizationProto proto = ResourceUtilizationProto
.getDefaultInstance();
private ResourceUtilizationProto.Builder builder = null;
private boolean viaProto = false;
public ResourceUtilizationPBImpl() {
builder = ResourceUtilizationProto.newBuilder();
}
public ResourceUtilizationPBImpl(ResourceUtilizationProto proto) {
this.proto = proto;
viaProto = true;
}
public ResourceUtilizationProto getProto() {
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = ResourceUtilizationProto.newBuilder(proto);
}
viaProto = false;
}
@Override
public int getPhysicalMemory() {
ResourceUtilizationProtoOrBuilder p = viaProto ? proto : builder;
return (p.getPmem());
}
@Override
public void setPhysicalMemory(int pmem) {
maybeInitBuilder();
builder.setPmem(pmem);
}
@Override
public int getVirtualMemory() {
ResourceUtilizationProtoOrBuilder p = viaProto ? proto : builder;
return (p.getVmem());
}
@Override
public void setVirtualMemory(int vmem) {
maybeInitBuilder();
builder.setPmem(vmem);
}
@Override
public float getCPU() {
ResourceUtilizationProtoOrBuilder p = viaProto ? proto : builder;
return p.getCpu();
}
@Override
public void setCPU(float cpu) {
maybeInitBuilder();
builder.setCpu(cpu);
}
@Override
public int compareTo(ResourceUtilization other) {
int diff = this.getPhysicalMemory() - other.getPhysicalMemory();
if (diff == 0) {
diff = this.getVirtualMemory() - other.getVirtualMemory();
if (diff == 0) {
diff = Float.compare(this.getCPU(), other.getCPU());
}
}
return diff;
}
}

View File

@ -0,0 +1,19 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/** Server records. */
package org.apache.hadoop.yarn.server.api.records;

View File

@ -36,6 +36,7 @@ message NodeStatusProto {
repeated ContainerStatusProto containersStatuses = 3;
optional NodeHealthStatusProto nodeHealthStatus = 4;
repeated ApplicationIdProto keep_alive_applications = 5;
optional ResourceUtilizationProto containers_utilization = 6;
}
message MasterKeyProto {
@ -52,4 +53,10 @@ message NodeHealthStatusProto {
message VersionProto {
optional int32 major_version = 1;
optional int32 minor_version = 2;
}
message ResourceUtilizationProto {
optional int32 pmem = 1;
optional int32 vmem = 2;
optional float cpu = 3;
}

View File

@ -73,13 +73,14 @@
import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils;
import org.apache.hadoop.yarn.util.YarnVersionInfo;
@ -429,13 +430,27 @@ private NodeStatus getNodeStatus(int responseId) throws IOException {
+ ", " + nodeHealthStatus.getHealthReport());
}
List<ContainerStatus> containersStatuses = getContainerStatuses();
ResourceUtilization containersUtilization = getContainersUtilization();
NodeStatus nodeStatus =
NodeStatus.newInstance(nodeId, responseId, containersStatuses,
createKeepAliveApplicationList(), nodeHealthStatus);
createKeepAliveApplicationList(), nodeHealthStatus,
containersUtilization);
return nodeStatus;
}
/**
* Get the aggregated utilization of the containers in this node.
* @return Resource utilization of all the containers.
*/
private ResourceUtilization getContainersUtilization() {
ContainerManagerImpl containerManager =
(ContainerManagerImpl) this.context.getContainerManager();
ContainersMonitor containersMonitor =
containerManager.getContainersMonitor();
return containersMonitor.getContainersUtilization();
}
// Iterate through the NMContext and clone and get all the containers'
// statuses. If it's a completed container, add into the
// recentlyStoppedContainers collections.

View File

@ -19,10 +19,11 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.server.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.nodemanager.ResourceView;
public interface ContainersMonitor extends Service,
EventHandler<ContainersMonitorEvent>, ResourceView {
public ResourceUtilization getContainersUtilization();
}

View File

@ -35,6 +35,7 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent;
@ -78,6 +79,8 @@ public class ContainersMonitorImpl extends AbstractService implements
private static final long UNKNOWN_MEMORY_LIMIT = -1L;
private int nodeCpuPercentageForYARN;
private ResourceUtilization containersUtilization;
public ContainersMonitorImpl(ContainerExecutor exec,
AsyncDispatcher dispatcher, Context context) {
super("containers-monitor");
@ -89,6 +92,8 @@ public ContainersMonitorImpl(ContainerExecutor exec,
this.containersToBeAdded = new HashMap<ContainerId, ProcessTreeInfo>();
this.containersToBeRemoved = new ArrayList<ContainerId>();
this.monitoringThread = new MonitoringThread();
this.containersUtilization = ResourceUtilization.newInstance(0, 0, 0.0f);
}
@Override
@ -384,6 +389,11 @@ public void run() {
containersToBeRemoved.clear();
}
// Temporary structure to calculate the total resource utilization of
// the containers
ResourceUtilization trackedContainersUtilization =
ResourceUtilization.newInstance(0, 0, 0.0f);
// Now do the monitoring for the trackingContainers
// Check memory usage and kill any overflowing containers
long vmemUsageByAllContainers = 0;
@ -463,6 +473,12 @@ public void run() {
currentPmemUsage, pmemLimit));
}
// Add resource utilization for this container
trackedContainersUtilization.addTo(
(int) (currentPmemUsage >> 20),
(int) (currentVmemUsage >> 20),
milliVcoresUsed / 1000.0f);
// Add usage to container metrics
if (containerMetricsEnabled) {
ContainerMetrics.forContainer(
@ -542,6 +558,9 @@ && isProcessTreeOverLimit(containerId.toString(),
+ cpuUsagePercentPerCoreByAllContainers);
}
// Save the aggregated utilization of the containers
setContainersUtilization(trackedContainersUtilization);
try {
Thread.sleep(monitoringInterval);
} catch (InterruptedException e) {
@ -613,6 +632,15 @@ public boolean isVmemCheckEnabled() {
return this.vmemCheckEnabled;
}
@Override
public ResourceUtilization getContainersUtilization() {
return this.containersUtilization;
}
public void setContainersUtilization(ResourceUtilization utilization) {
this.containersUtilization = utilization;
}
@Override
public void handle(ContainersMonitorEvent monitoringEvent) {