YARN-2995. Enhance UI to show cluster resource utilization of various container Execution types. (Konstantinos Karanasos via asuresh)
This commit is contained in:
parent
19b3779ae7
commit
0aafc122d4
@ -240,6 +240,8 @@ protected void serviceInit(Configuration conf) throws Exception {
|
||||
this.scheduledRequests.setNumOpportunisticMapsPer100(
|
||||
conf.getInt(MRJobConfig.MR_NUM_OPPORTUNISTIC_MAPS_PER_100,
|
||||
MRJobConfig.DEFAULT_MR_NUM_OPPORTUNISTIC_MAPS_PER_100));
|
||||
LOG.info(this.scheduledRequests.getNumOpportunisticMapsPer100() +
|
||||
"% of the mappers will be scheduled using OPPORTUNISTIC containers");
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -1060,6 +1062,10 @@ void setNumOpportunisticMapsPer100(int numMaps) {
|
||||
this.numOpportunisticMapsPer100 = numMaps;
|
||||
}
|
||||
|
||||
int getNumOpportunisticMapsPer100() {
|
||||
return this.numOpportunisticMapsPer100;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
final LinkedHashMap<TaskAttemptId, ContainerRequest> reduces =
|
||||
new LinkedHashMap<TaskAttemptId, ContainerRequest>();
|
||||
|
@ -36,7 +36,7 @@
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
||||
import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
|
||||
import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode
|
||||
@ -191,7 +191,7 @@ public List<Container> pullNewlyIncreasedContainers() {
|
||||
return null;
|
||||
}
|
||||
|
||||
public QueuedContainersStatus getQueuedContainersStatus() {
|
||||
public OpportunisticContainersStatus getOpportunisticContainersStatus() {
|
||||
return null;
|
||||
}
|
||||
|
||||
|
@ -29,7 +29,7 @@
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
||||
import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
|
||||
import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode
|
||||
@ -180,7 +180,7 @@ public List<Container> pullNewlyIncreasedContainers() {
|
||||
return Collections.EMPTY_LIST;
|
||||
}
|
||||
|
||||
public QueuedContainersStatus getQueuedContainersStatus() {
|
||||
public OpportunisticContainersStatus getOpportunisticContainersStatus() {
|
||||
return null;
|
||||
}
|
||||
|
||||
|
@ -125,10 +125,11 @@ public abstract void setIncreasedContainers(
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public abstract QueuedContainersStatus getQueuedContainersStatus();
|
||||
public abstract OpportunisticContainersStatus
|
||||
getOpportunisticContainersStatus();
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public abstract void setQueuedContainersStatus(
|
||||
QueuedContainersStatus queuedContainersStatus);
|
||||
public abstract void setOpportunisticContainersStatus(
|
||||
OpportunisticContainersStatus opportunisticContainersStatus);
|
||||
}
|
||||
|
@ -0,0 +1,152 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.api.records;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
|
||||
/**
|
||||
* <p> <code>OpportunisticContainersStatus</code> captures information
|
||||
* pertaining to the state of execution of the opportunistic containers within a
|
||||
* node. </p>
|
||||
*/
|
||||
@Private
|
||||
@Unstable
|
||||
public abstract class OpportunisticContainersStatus {
|
||||
public static OpportunisticContainersStatus newInstance() {
|
||||
return Records.newRecord(OpportunisticContainersStatus.class);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the number of currently running opportunistic containers on the
|
||||
* node.
|
||||
*
|
||||
* @return number of running opportunistic containers.
|
||||
*/
|
||||
@Private
|
||||
@Unstable
|
||||
public abstract int getRunningOpportContainers();
|
||||
|
||||
/**
|
||||
* Sets the number of running opportunistic containers.
|
||||
*
|
||||
* @param runningOpportContainers number of running opportunistic containers.
|
||||
*/
|
||||
@Private
|
||||
@Unstable
|
||||
public abstract void setRunningOpportContainers(int runningOpportContainers);
|
||||
|
||||
/**
|
||||
* Returns memory currently used on the node for running opportunistic
|
||||
* containers.
|
||||
*
|
||||
* @return memory (in bytes) used for running opportunistic containers.
|
||||
*/
|
||||
@Private
|
||||
@Unstable
|
||||
public abstract long getOpportMemoryUsed();
|
||||
|
||||
/**
|
||||
* Sets the memory used on the node for running opportunistic containers.
|
||||
*
|
||||
* @param opportMemoryUsed memory (in bytes) used for running opportunistic
|
||||
* containers.
|
||||
*/
|
||||
@Private
|
||||
@Unstable
|
||||
public abstract void setOpportMemoryUsed(long opportMemoryUsed);
|
||||
|
||||
/**
|
||||
* Returns CPU cores currently used on the node for running opportunistic
|
||||
* containers.
|
||||
*
|
||||
* @return CPU cores used for running opportunistic containers.
|
||||
*/
|
||||
@Private
|
||||
@Unstable
|
||||
public abstract int getOpportCoresUsed();
|
||||
|
||||
/**
|
||||
* Sets the CPU cores used on the node for running opportunistic containers.
|
||||
*
|
||||
* @param opportCoresUsed memory (in bytes) used for running opportunistic
|
||||
* containers.
|
||||
*/
|
||||
@Private
|
||||
@Unstable
|
||||
public abstract void setOpportCoresUsed(int opportCoresUsed);
|
||||
|
||||
/**
|
||||
* Returns the number of queued opportunistic containers on the node.
|
||||
*
|
||||
* @return number of queued opportunistic containers.
|
||||
*/
|
||||
@Private
|
||||
@Unstable
|
||||
public abstract int getQueuedOpportContainers();
|
||||
|
||||
/**
|
||||
* Sets the number of queued opportunistic containers on the node.
|
||||
*
|
||||
* @param queuedOpportContainers number of queued opportunistic containers.
|
||||
*/
|
||||
@Private
|
||||
@Unstable
|
||||
public abstract void setQueuedOpportContainers(int queuedOpportContainers);
|
||||
|
||||
/**
|
||||
* Returns the length of the containers queue on the node.
|
||||
*
|
||||
* @return length of the containers queue.
|
||||
*/
|
||||
@Private
|
||||
@Unstable
|
||||
public abstract int getWaitQueueLength();
|
||||
|
||||
/**
|
||||
* Sets the length of the containers queue on the node.
|
||||
*
|
||||
* @param waitQueueLength length of the containers queue.
|
||||
*/
|
||||
@Private
|
||||
@Unstable
|
||||
public abstract void setWaitQueueLength(int waitQueueLength);
|
||||
|
||||
/**
|
||||
* Returns the estimated time that a container will have to wait if added to
|
||||
* the queue of the node.
|
||||
*
|
||||
* @return estimated queuing time.
|
||||
*/
|
||||
@Private
|
||||
@Unstable
|
||||
public abstract int getEstimatedQueueWaitTime();
|
||||
|
||||
/**
|
||||
* Sets the estimated time that a container will have to wait if added to the
|
||||
* queue of the node.
|
||||
*
|
||||
* @param queueWaitTime estimated queuing time.
|
||||
*/
|
||||
@Private
|
||||
@Unstable
|
||||
public abstract void setEstimatedQueueWaitTime(int queueWaitTime);
|
||||
|
||||
}
|
@ -1,45 +0,0 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.api.records;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Evolving;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* <code>QueuedContainersStatus</code> captures information pertaining to the
|
||||
* state of execution of the Queueable containers within a node.
|
||||
* </p>
|
||||
*/
|
||||
@Private
|
||||
@Evolving
|
||||
public abstract class QueuedContainersStatus {
|
||||
public static QueuedContainersStatus newInstance() {
|
||||
return Records.newRecord(QueuedContainersStatus.class);
|
||||
}
|
||||
|
||||
public abstract int getEstimatedQueueWaitTime();
|
||||
|
||||
public abstract void setEstimatedQueueWaitTime(int queueWaitTime);
|
||||
|
||||
public abstract int getWaitQueueLength();
|
||||
|
||||
public abstract void setWaitQueueLength(int waitQueueLength);
|
||||
}
|
@ -41,9 +41,9 @@
|
||||
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeHealthStatusProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProtoOrBuilder;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.QueuedContainersStatusProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.OpportunisticContainersStatusProto;
|
||||
|
||||
import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
|
||||
import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
|
||||
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
|
||||
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
||||
|
||||
@ -404,25 +404,25 @@ public synchronized void setIncreasedContainers(
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized QueuedContainersStatus getQueuedContainersStatus() {
|
||||
NodeStatusProtoOrBuilder p =
|
||||
this.viaProto ? this.proto : this.builder;
|
||||
if (!p.hasQueuedContainerStatus()) {
|
||||
public synchronized OpportunisticContainersStatus
|
||||
getOpportunisticContainersStatus() {
|
||||
NodeStatusProtoOrBuilder p = this.viaProto ? this.proto : this.builder;
|
||||
if (!p.hasOpportunisticContainersStatus()) {
|
||||
return null;
|
||||
}
|
||||
return convertFromProtoFormat(p.getQueuedContainerStatus());
|
||||
return convertFromProtoFormat(p.getOpportunisticContainersStatus());
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void setQueuedContainersStatus(
|
||||
QueuedContainersStatus queuedContainersStatus) {
|
||||
public synchronized void setOpportunisticContainersStatus(
|
||||
OpportunisticContainersStatus opportunisticContainersStatus) {
|
||||
maybeInitBuilder();
|
||||
if (queuedContainersStatus == null) {
|
||||
this.builder.clearQueuedContainerStatus();
|
||||
if (opportunisticContainersStatus == null) {
|
||||
this.builder.clearOpportunisticContainersStatus();
|
||||
return;
|
||||
}
|
||||
this.builder.setQueuedContainerStatus(
|
||||
convertToProtoFormat(queuedContainersStatus));
|
||||
this.builder.setOpportunisticContainersStatus(
|
||||
convertToProtoFormat(opportunisticContainersStatus));
|
||||
}
|
||||
|
||||
private NodeIdProto convertToProtoFormat(NodeId nodeId) {
|
||||
@ -468,14 +468,14 @@ private ResourceUtilizationPBImpl convertFromProtoFormat(
|
||||
return new ResourceUtilizationPBImpl(p);
|
||||
}
|
||||
|
||||
private QueuedContainersStatusProto convertToProtoFormat(
|
||||
QueuedContainersStatus r) {
|
||||
return ((QueuedContainersStatusPBImpl) r).getProto();
|
||||
private OpportunisticContainersStatusProto convertToProtoFormat(
|
||||
OpportunisticContainersStatus r) {
|
||||
return ((OpportunisticContainersStatusPBImpl) r).getProto();
|
||||
}
|
||||
|
||||
private QueuedContainersStatus convertFromProtoFormat(
|
||||
QueuedContainersStatusProto p) {
|
||||
return new QueuedContainersStatusPBImpl(p);
|
||||
private OpportunisticContainersStatus convertFromProtoFormat(
|
||||
OpportunisticContainersStatusProto p) {
|
||||
return new OpportunisticContainersStatusPBImpl(p);
|
||||
}
|
||||
|
||||
private ContainerPBImpl convertFromProtoFormat(
|
||||
|
@ -0,0 +1,139 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.api.records.impl.pb;
|
||||
|
||||
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos;
|
||||
import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
|
||||
|
||||
/**
|
||||
* Protocol Buffer implementation of OpportunisticContainersStatus.
|
||||
*/
|
||||
public class OpportunisticContainersStatusPBImpl
|
||||
extends OpportunisticContainersStatus {
|
||||
|
||||
private YarnServerCommonProtos.OpportunisticContainersStatusProto proto =
|
||||
YarnServerCommonProtos.OpportunisticContainersStatusProto
|
||||
.getDefaultInstance();
|
||||
private YarnServerCommonProtos.OpportunisticContainersStatusProto.Builder
|
||||
builder = null;
|
||||
private boolean viaProto = false;
|
||||
|
||||
public OpportunisticContainersStatusPBImpl() {
|
||||
builder =
|
||||
YarnServerCommonProtos.OpportunisticContainersStatusProto.newBuilder();
|
||||
}
|
||||
|
||||
public OpportunisticContainersStatusPBImpl(YarnServerCommonProtos
|
||||
.OpportunisticContainersStatusProto proto) {
|
||||
this.proto = proto;
|
||||
viaProto = true;
|
||||
}
|
||||
|
||||
public YarnServerCommonProtos.OpportunisticContainersStatusProto getProto() {
|
||||
proto = viaProto ? proto : builder.build();
|
||||
viaProto = true;
|
||||
return proto;
|
||||
}
|
||||
|
||||
private void maybeInitBuilder() {
|
||||
if (viaProto || builder == null) {
|
||||
builder = YarnServerCommonProtos.OpportunisticContainersStatusProto
|
||||
.newBuilder(proto);
|
||||
}
|
||||
viaProto = false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getRunningOpportContainers() {
|
||||
YarnServerCommonProtos.OpportunisticContainersStatusProtoOrBuilder p =
|
||||
viaProto ? proto : builder;
|
||||
return p.getRunningOpportContainers();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRunningOpportContainers(int runningOpportContainers) {
|
||||
maybeInitBuilder();
|
||||
builder.setRunningOpportContainers(runningOpportContainers);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getOpportMemoryUsed() {
|
||||
YarnServerCommonProtos.OpportunisticContainersStatusProtoOrBuilder p =
|
||||
viaProto ? proto : builder;
|
||||
return p.getOpportMemoryUsed();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setOpportMemoryUsed(long opportMemoryUsed) {
|
||||
maybeInitBuilder();
|
||||
builder.setOpportMemoryUsed(opportMemoryUsed);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getOpportCoresUsed() {
|
||||
YarnServerCommonProtos.OpportunisticContainersStatusProtoOrBuilder p =
|
||||
viaProto ? proto : builder;
|
||||
return p.getOpportCoresUsed();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setOpportCoresUsed(int opportCoresUsed) {
|
||||
maybeInitBuilder();
|
||||
builder.setOpportCoresUsed(opportCoresUsed);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getQueuedOpportContainers() {
|
||||
YarnServerCommonProtos.OpportunisticContainersStatusProtoOrBuilder p =
|
||||
viaProto ? proto : builder;
|
||||
return p.getQueuedOpportContainers();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setQueuedOpportContainers(int queuedOpportContainers) {
|
||||
maybeInitBuilder();
|
||||
builder.setQueuedOpportContainers(queuedOpportContainers);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getWaitQueueLength() {
|
||||
YarnServerCommonProtos.OpportunisticContainersStatusProtoOrBuilder p =
|
||||
viaProto ? proto : builder;
|
||||
return p.getWaitQueueLength();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setWaitQueueLength(int waitQueueLength) {
|
||||
maybeInitBuilder();
|
||||
builder.setWaitQueueLength(waitQueueLength);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getEstimatedQueueWaitTime() {
|
||||
YarnServerCommonProtos.OpportunisticContainersStatusProtoOrBuilder p =
|
||||
viaProto ? proto : builder;
|
||||
return p.getEstimatedQueueWaitTime();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setEstimatedQueueWaitTime(int queueWaitTime) {
|
||||
maybeInitBuilder();
|
||||
builder.setEstimatedQueueWaitTime(queueWaitTime);
|
||||
}
|
||||
}
|
@ -1,84 +0,0 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.api.records.impl.pb;
|
||||
|
||||
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos;
|
||||
import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
|
||||
|
||||
/**
|
||||
* Protocol Buffer implementation of QueuedContainersStatus.
|
||||
*/
|
||||
public class QueuedContainersStatusPBImpl extends QueuedContainersStatus {
|
||||
|
||||
private YarnServerCommonProtos.QueuedContainersStatusProto proto =
|
||||
YarnServerCommonProtos.QueuedContainersStatusProto.getDefaultInstance();
|
||||
private YarnServerCommonProtos.QueuedContainersStatusProto.Builder builder =
|
||||
null;
|
||||
private boolean viaProto = false;
|
||||
|
||||
public QueuedContainersStatusPBImpl() {
|
||||
builder = YarnServerCommonProtos.QueuedContainersStatusProto.newBuilder();
|
||||
}
|
||||
|
||||
public QueuedContainersStatusPBImpl(YarnServerCommonProtos
|
||||
.QueuedContainersStatusProto proto) {
|
||||
this.proto = proto;
|
||||
viaProto = true;
|
||||
}
|
||||
|
||||
public YarnServerCommonProtos.QueuedContainersStatusProto getProto() {
|
||||
proto = viaProto ? proto : builder.build();
|
||||
viaProto = true;
|
||||
return proto;
|
||||
}
|
||||
|
||||
private void maybeInitBuilder() {
|
||||
if (viaProto || builder == null) {
|
||||
builder =
|
||||
YarnServerCommonProtos.QueuedContainersStatusProto.newBuilder(proto);
|
||||
}
|
||||
viaProto = false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getEstimatedQueueWaitTime() {
|
||||
YarnServerCommonProtos.QueuedContainersStatusProtoOrBuilder p =
|
||||
viaProto ? proto : builder;
|
||||
return p.getEstimatedQueueWaitTime();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setEstimatedQueueWaitTime(int queueWaitTime) {
|
||||
maybeInitBuilder();
|
||||
builder.setEstimatedQueueWaitTime(queueWaitTime);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getWaitQueueLength() {
|
||||
YarnServerCommonProtos.QueuedContainersStatusProtoOrBuilder p =
|
||||
viaProto ? proto : builder;
|
||||
return p.getWaitQueueLength();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setWaitQueueLength(int waitQueueLength) {
|
||||
maybeInitBuilder();
|
||||
builder.setWaitQueueLength(waitQueueLength);
|
||||
}
|
||||
}
|
@ -39,12 +39,16 @@ message NodeStatusProto {
|
||||
optional ResourceUtilizationProto containers_utilization = 6;
|
||||
optional ResourceUtilizationProto node_utilization = 7;
|
||||
repeated ContainerProto increased_containers = 8;
|
||||
optional QueuedContainersStatusProto queued_container_status = 9;
|
||||
optional OpportunisticContainersStatusProto opportunistic_containers_status = 9;
|
||||
}
|
||||
|
||||
message QueuedContainersStatusProto {
|
||||
optional int32 estimated_queue_wait_time = 1;
|
||||
optional int32 wait_queue_length = 2;
|
||||
message OpportunisticContainersStatusProto {
|
||||
optional int32 running_opport_containers = 1;
|
||||
optional int64 opport_memory_used = 2;
|
||||
optional int32 opport_cores_used = 3;
|
||||
optional int32 queued_opport_containers = 4;
|
||||
optional int32 wait_queue_length = 5;
|
||||
optional int32 estimated_queue_wait_time = 6;
|
||||
}
|
||||
|
||||
message MasterKeyProto {
|
||||
|
@ -48,7 +48,7 @@
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterNodeManagerRequestPBImpl;
|
||||
|
||||
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
||||
import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
|
||||
import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
@ -146,11 +146,11 @@ public void testNodeHeartBeatRequest() throws IOException {
|
||||
Records.newRecord(NodeHeartbeatRequest.class);
|
||||
NodeStatus nodeStatus =
|
||||
Records.newRecord(NodeStatus.class);
|
||||
QueuedContainersStatus queuedContainersStatus = Records.newRecord
|
||||
(QueuedContainersStatus.class);
|
||||
queuedContainersStatus.setEstimatedQueueWaitTime(123);
|
||||
queuedContainersStatus.setWaitQueueLength(321);
|
||||
nodeStatus.setQueuedContainersStatus(queuedContainersStatus);
|
||||
OpportunisticContainersStatus opportunisticContainersStatus =
|
||||
Records.newRecord(OpportunisticContainersStatus.class);
|
||||
opportunisticContainersStatus.setEstimatedQueueWaitTime(123);
|
||||
opportunisticContainersStatus.setWaitQueueLength(321);
|
||||
nodeStatus.setOpportunisticContainersStatus(opportunisticContainersStatus);
|
||||
record.setNodeStatus(nodeStatus);
|
||||
|
||||
NodeHeartbeatRequestPBImpl pb = new
|
||||
@ -159,9 +159,10 @@ public void testNodeHeartBeatRequest() throws IOException {
|
||||
|
||||
Assert.assertEquals(123,
|
||||
pb.getNodeStatus()
|
||||
.getQueuedContainersStatus().getEstimatedQueueWaitTime());
|
||||
.getOpportunisticContainersStatus().getEstimatedQueueWaitTime());
|
||||
Assert.assertEquals(321,
|
||||
pb.getNodeStatus().getQueuedContainersStatus().getWaitQueueLength());
|
||||
pb.getNodeStatus().getOpportunisticContainersStatus()
|
||||
.getWaitQueueLength());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -75,7 +75,7 @@
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest;
|
||||
|
||||
import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
|
||||
import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
|
||||
import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
|
||||
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||
import org.apache.hadoop.yarn.server.api.records.NodeAction;
|
||||
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
|
||||
@ -465,16 +465,21 @@ protected NodeStatus getNodeStatus(int responseId) throws IOException {
|
||||
createKeepAliveApplicationList(), nodeHealthStatus,
|
||||
containersUtilization, nodeUtilization, increasedContainers);
|
||||
|
||||
nodeStatus.setQueuedContainersStatus(getQueuedContainerStatus());
|
||||
nodeStatus.setOpportunisticContainersStatus(
|
||||
getOpportunisticContainersStatus());
|
||||
return nodeStatus;
|
||||
}
|
||||
|
||||
private QueuedContainersStatus getQueuedContainerStatus() {
|
||||
QueuedContainersStatus status = QueuedContainersStatus.newInstance();
|
||||
status.setWaitQueueLength(
|
||||
this.context.getQueuingContext().getQueuedContainers().size());
|
||||
/**
|
||||
* Get the status of the OPPORTUNISTIC containers.
|
||||
* @return the status of the OPPORTUNISTIC containers.
|
||||
*/
|
||||
private OpportunisticContainersStatus getOpportunisticContainersStatus() {
|
||||
OpportunisticContainersStatus status =
|
||||
this.context.getContainerManager().getOpportunisticContainersStatus();
|
||||
return status;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the aggregated utilization of the containers in this node.
|
||||
* @return Resource utilization of all the containers.
|
||||
|
@ -22,6 +22,7 @@
|
||||
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
|
||||
import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.ContainerManagerEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor
|
||||
.ContainersMonitor;
|
||||
@ -35,6 +36,8 @@ public interface ContainerManager extends ServiceStateChangeListener,
|
||||
|
||||
ContainersMonitor getContainersMonitor();
|
||||
|
||||
OpportunisticContainersStatus getOpportunisticContainersStatus();
|
||||
|
||||
void updateQueuingLimit(ContainerQueuingLimit queuingLimit);
|
||||
|
||||
void setBlockNewContainerRequests(boolean blockNewContainerRequests);
|
||||
|
@ -88,6 +88,7 @@
|
||||
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.api.ContainerType;
|
||||
import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
|
||||
import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedContainersEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.CMgrDecreaseContainersResourceEvent;
|
||||
@ -1520,9 +1521,14 @@ protected boolean isServiceStopped() {
|
||||
return serviceStopped;
|
||||
}
|
||||
|
||||
@Override
|
||||
public OpportunisticContainersStatus getOpportunisticContainersStatus() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateQueuingLimit(ContainerQueuingLimit queuingLimit) {
|
||||
LOG.trace("Implementation does not support queuing of Containers !!");
|
||||
LOG.trace("Implementation does not support queuing of Containers!!");
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
|
@ -304,21 +304,21 @@ void setProcessTree(ResourceCalculatorProcessTree mypTree) {
|
||||
/**
|
||||
* @return Virtual memory limit for the process tree in bytes
|
||||
*/
|
||||
synchronized long getVmemLimit() {
|
||||
public synchronized long getVmemLimit() {
|
||||
return this.vmemLimit;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Physical memory limit for the process tree in bytes
|
||||
*/
|
||||
synchronized long getPmemLimit() {
|
||||
public synchronized long getPmemLimit() {
|
||||
return this.pmemLimit;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Number of cpu vcores assigned
|
||||
*/
|
||||
synchronized int getCpuVcores() {
|
||||
public synchronized int getCpuVcores() {
|
||||
return this.cpuVcores;
|
||||
}
|
||||
|
||||
|
@ -46,6 +46,7 @@
|
||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
|
||||
import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
||||
@ -80,10 +81,14 @@ public class QueuingContainerManagerImpl extends ContainerManagerImpl {
|
||||
private ConcurrentMap<ContainerId, AllocatedContainerInfo>
|
||||
allocatedOpportunisticContainers;
|
||||
|
||||
private long allocatedMemoryOpportunistic;
|
||||
private int allocatedVCoresOpportunistic;
|
||||
|
||||
private Queue<AllocatedContainerInfo> queuedGuaranteedContainers;
|
||||
private Queue<AllocatedContainerInfo> queuedOpportunisticContainers;
|
||||
|
||||
private Set<ContainerId> opportunisticContainersToKill;
|
||||
private final OpportunisticContainersStatus opportunisticContainersStatus;
|
||||
private final ContainerQueuingLimit queuingLimit;
|
||||
|
||||
public QueuingContainerManagerImpl(Context context, ContainerExecutor exec,
|
||||
@ -93,10 +98,14 @@ public QueuingContainerManagerImpl(Context context, ContainerExecutor exec,
|
||||
dirsHandler);
|
||||
this.allocatedGuaranteedContainers = new ConcurrentHashMap<>();
|
||||
this.allocatedOpportunisticContainers = new ConcurrentHashMap<>();
|
||||
this.allocatedMemoryOpportunistic = 0;
|
||||
this.allocatedVCoresOpportunistic = 0;
|
||||
this.queuedGuaranteedContainers = new ConcurrentLinkedQueue<>();
|
||||
this.queuedOpportunisticContainers = new ConcurrentLinkedQueue<>();
|
||||
this.opportunisticContainersToKill = Collections.synchronizedSet(
|
||||
new HashSet<ContainerId>());
|
||||
this.opportunisticContainersStatus =
|
||||
OpportunisticContainersStatus.newInstance();
|
||||
this.queuingLimit = ContainerQueuingLimit.newInstance();
|
||||
}
|
||||
|
||||
@ -196,6 +205,8 @@ private void startAllocatedContainer(
|
||||
} else {
|
||||
allocatedOpportunisticContainers.put(pti.getContainerId(),
|
||||
allocatedContainerInfo);
|
||||
allocatedMemoryOpportunistic += pti.getPmemLimit();
|
||||
allocatedVCoresOpportunistic += pti.getCpuVcores();
|
||||
}
|
||||
|
||||
getContainersMonitor().increaseContainersAllocation(pti);
|
||||
@ -267,6 +278,11 @@ private void removeAllocatedContainer(ContainerId containerId) {
|
||||
if (contToRemove != null) {
|
||||
getContainersMonitor().decreaseContainersAllocation(contToRemove
|
||||
.getPti());
|
||||
|
||||
if (contToRemove.getExecutionType() == ExecutionType.OPPORTUNISTIC) {
|
||||
allocatedMemoryOpportunistic -= contToRemove.getPti().getPmemLimit();
|
||||
allocatedVCoresOpportunistic -= contToRemove.getPti().getCpuVcores();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -556,6 +572,22 @@ public void handle(ApplicationEvent event) {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public OpportunisticContainersStatus getOpportunisticContainersStatus() {
|
||||
opportunisticContainersStatus
|
||||
.setRunningOpportContainers(allocatedOpportunisticContainers.size());
|
||||
opportunisticContainersStatus
|
||||
.setOpportMemoryUsed(allocatedMemoryOpportunistic);
|
||||
opportunisticContainersStatus
|
||||
.setOpportCoresUsed(allocatedVCoresOpportunistic);
|
||||
opportunisticContainersStatus
|
||||
.setQueuedOpportContainers(queuedOpportunisticContainers.size());
|
||||
opportunisticContainersStatus.setWaitQueueLength(
|
||||
queuedGuaranteedContainers.size() +
|
||||
queuedOpportunisticContainers.size());
|
||||
return opportunisticContainersStatus;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateQueuingLimit(ContainerQueuingLimit limit) {
|
||||
this.queuingLimit.setMaxQueueLength(limit.getMaxQueueLength());
|
||||
|
@ -86,6 +86,7 @@ protected void render(Block html) {
|
||||
._("User", info.getUser())
|
||||
._("TotalMemoryNeeded", info.getMemoryNeeded())
|
||||
._("TotalVCoresNeeded", info.getVCoresNeeded())
|
||||
._("ExecutionType", info.getExecutionType())
|
||||
._("logs", info.getShortLogLink(), "Link to logs");
|
||||
html._(InfoBlock.class);
|
||||
}
|
||||
|
@ -46,6 +46,7 @@ public class ContainerInfo {
|
||||
protected String user;
|
||||
protected long totalMemoryNeededMB;
|
||||
protected long totalVCoresNeeded;
|
||||
private String executionType;
|
||||
protected String containerLogsLink;
|
||||
protected String nodeId;
|
||||
@XmlTransient
|
||||
@ -84,6 +85,8 @@ public ContainerInfo(final Context nmContext, final Container container,
|
||||
this.totalMemoryNeededMB = res.getMemorySize();
|
||||
this.totalVCoresNeeded = res.getVirtualCores();
|
||||
}
|
||||
this.executionType =
|
||||
container.getContainerTokenIdentifier().getExecutionType().name();
|
||||
this.containerLogsShortLink = ujoin("containerlogs", this.id,
|
||||
container.getUser());
|
||||
|
||||
@ -143,6 +146,10 @@ public long getVCoresNeeded() {
|
||||
return this.totalVCoresNeeded;
|
||||
}
|
||||
|
||||
public String getExecutionType() {
|
||||
return this.executionType;
|
||||
}
|
||||
|
||||
public List<String> getContainerLogFiles() {
|
||||
return this.containerLogFiles;
|
||||
}
|
||||
|
@ -515,7 +515,7 @@ public void verifyContainersInfoXML(NodeList nodes, Container cont)
|
||||
|
||||
public void verifyNodeContainerInfo(JSONObject info, Container cont)
|
||||
throws JSONException, Exception {
|
||||
assertEquals("incorrect number of elements", 10, info.length());
|
||||
assertEquals("incorrect number of elements", 11, info.length());
|
||||
|
||||
verifyNodeContainerInfoGeneric(cont, info.getString("id"),
|
||||
info.getString("state"), info.getString("user"),
|
||||
|
@ -81,8 +81,9 @@
|
||||
|
||||
/**
|
||||
* The OpportunisticContainerAllocatorAMService is started instead of the
|
||||
* ApplicationMasterService if distributed scheduling is enabled for the YARN
|
||||
* cluster.
|
||||
* ApplicationMasterService if opportunistic scheduling is enabled for the YARN
|
||||
* cluster (either centralized or distributed opportunistic scheduling).
|
||||
*
|
||||
* It extends the functionality of the ApplicationMasterService by servicing
|
||||
* clients (AMs and AMRMProxy request interceptors) that understand the
|
||||
* DistributedSchedulingProtocol.
|
||||
|
@ -31,7 +31,7 @@
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
||||
import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
|
||||
import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
|
||||
|
||||
/**
|
||||
* Node managers information on available resources
|
||||
@ -170,7 +170,7 @@ public void updateNodeHeartbeatResponseForContainersDecreasing(
|
||||
|
||||
public List<Container> pullNewlyIncreasedContainers();
|
||||
|
||||
QueuedContainersStatus getQueuedContainersStatus();
|
||||
OpportunisticContainersStatus getOpportunisticContainersStatus();
|
||||
|
||||
long getUntrackedTimeStamp();
|
||||
|
||||
|
@ -61,7 +61,7 @@
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
||||
import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
|
||||
import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
|
||||
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEvent;
|
||||
@ -134,7 +134,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
||||
private ResourceUtilization nodeUtilization;
|
||||
|
||||
/* Container Queue Information for the node.. Used by Distributed Scheduler */
|
||||
private QueuedContainersStatus queuedContainersStatus;
|
||||
private OpportunisticContainersStatus opportunisticContainersStatus;
|
||||
|
||||
private final ContainerAllocationExpirer containerAllocationExpirer;
|
||||
/* set of containers that have just launched */
|
||||
@ -1169,7 +1169,8 @@ public static class StatusUpdateWhenHealthyTransition implements
|
||||
public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
|
||||
|
||||
RMNodeStatusEvent statusEvent = (RMNodeStatusEvent) event;
|
||||
rmNode.setQueuedContainersStatus(statusEvent.getContainerQueueInfo());
|
||||
rmNode.setOpportunisticContainersStatus(
|
||||
statusEvent.getOpportunisticContainersStatus());
|
||||
NodeHealthStatus remoteNodeHealthStatus = updateRMNodeFromStatusEvents(
|
||||
rmNode, statusEvent);
|
||||
NodeState initialState = rmNode.getState();
|
||||
@ -1480,22 +1481,22 @@ public Resource getOriginalTotalCapability() {
|
||||
return this.originalTotalCapability;
|
||||
}
|
||||
|
||||
public QueuedContainersStatus getQueuedContainersStatus() {
|
||||
public OpportunisticContainersStatus getOpportunisticContainersStatus() {
|
||||
this.readLock.lock();
|
||||
|
||||
try {
|
||||
return this.queuedContainersStatus;
|
||||
return this.opportunisticContainersStatus;
|
||||
} finally {
|
||||
this.readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public void setQueuedContainersStatus(QueuedContainersStatus
|
||||
queuedContainersStatus) {
|
||||
public void setOpportunisticContainersStatus(
|
||||
OpportunisticContainersStatus opportunisticContainersStatus) {
|
||||
this.writeLock.lock();
|
||||
|
||||
try {
|
||||
this.queuedContainersStatus = queuedContainersStatus;
|
||||
this.opportunisticContainersStatus = opportunisticContainersStatus;
|
||||
} finally {
|
||||
this.writeLock.unlock();
|
||||
}
|
||||
|
@ -28,7 +28,7 @@
|
||||
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
||||
import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
|
||||
import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
|
||||
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
|
||||
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
||||
|
||||
@ -80,8 +80,8 @@ public List<LogAggregationReport> getLogAggregationReportsForApps() {
|
||||
return this.logAggregationReportsForApps;
|
||||
}
|
||||
|
||||
public QueuedContainersStatus getContainerQueueInfo() {
|
||||
return this.nodeStatus.getQueuedContainersStatus();
|
||||
public OpportunisticContainersStatus getOpportunisticContainersStatus() {
|
||||
return this.nodeStatus.getOpportunisticContainersStatus();
|
||||
}
|
||||
|
||||
public void setLogAggregationReportsForApps(
|
||||
|
@ -43,6 +43,7 @@
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.ExecutionType;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.NodeState;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
@ -570,7 +571,7 @@ public void completedContainer(RMContainer rmContainer,
|
||||
return;
|
||||
}
|
||||
|
||||
if (!rmContainer.isRemotelyAllocated()) {
|
||||
if (rmContainer.getExecutionType() == ExecutionType.GUARANTEED) {
|
||||
completedContainerInternal(rmContainer, containerStatus, event);
|
||||
} else {
|
||||
ContainerId containerId = rmContainer.getContainerId();
|
||||
|
@ -24,7 +24,7 @@
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceOption;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
||||
import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
|
||||
import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ClusterMonitor;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
|
||||
@ -195,11 +195,11 @@ public void removeNode(RMNode removedRMNode) {
|
||||
@Override
|
||||
public void updateNode(RMNode rmNode) {
|
||||
LOG.debug("Node update event from: " + rmNode.getNodeID());
|
||||
QueuedContainersStatus queuedContainersStatus =
|
||||
rmNode.getQueuedContainersStatus();
|
||||
OpportunisticContainersStatus opportunisticContainersStatus =
|
||||
rmNode.getOpportunisticContainersStatus();
|
||||
int estimatedQueueWaitTime =
|
||||
queuedContainersStatus.getEstimatedQueueWaitTime();
|
||||
int waitQueueLength = queuedContainersStatus.getWaitQueueLength();
|
||||
opportunisticContainersStatus.getEstimatedQueueWaitTime();
|
||||
int waitQueueLength = opportunisticContainersStatus.getWaitQueueLength();
|
||||
// Add nodes to clusterNodes. If estimatedQueueTime is -1, ignore node
|
||||
// UNLESS comparator is based on queue length.
|
||||
ReentrantReadWriteLock.WriteLock writeLock = clusterNodesLock.writeLock();
|
||||
|
@ -18,17 +18,10 @@
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.webapp;
|
||||
|
||||
import static org.apache.hadoop.yarn.webapp.YarnWebParams.NODE_LABEL;
|
||||
import static org.apache.hadoop.yarn.webapp.YarnWebParams.NODE_STATE;
|
||||
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.DATATABLES;
|
||||
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.DATATABLES_ID;
|
||||
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.initID;
|
||||
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.tableInit;
|
||||
|
||||
import java.util.Collection;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.yarn.api.records.NodeState;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||
@ -42,18 +35,29 @@
|
||||
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TBODY;
|
||||
import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
import java.util.Collection;
|
||||
|
||||
import static org.apache.hadoop.yarn.webapp.YarnWebParams.NODE_LABEL;
|
||||
import static org.apache.hadoop.yarn.webapp.YarnWebParams.NODE_STATE;
|
||||
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.DATATABLES;
|
||||
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.DATATABLES_ID;
|
||||
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.initID;
|
||||
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.tableInit;
|
||||
|
||||
class NodesPage extends RmView {
|
||||
|
||||
static class NodesBlock extends HtmlBlock {
|
||||
final ResourceManager rm;
|
||||
private static final long BYTES_IN_MB = 1024 * 1024;
|
||||
private static boolean opportunisticContainersEnabled;
|
||||
|
||||
@Inject
|
||||
NodesBlock(ResourceManager rm, ViewContext ctx) {
|
||||
super(ctx);
|
||||
this.rm = rm;
|
||||
this.opportunisticContainersEnabled = YarnConfiguration
|
||||
.isOpportunisticContainerAllocationEnabled(
|
||||
this.rm.getRMContext().getYarnConfiguration());
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -61,9 +65,10 @@ protected void render(Block html) {
|
||||
html._(MetricsOverviewTable.class);
|
||||
|
||||
ResourceScheduler sched = rm.getResourceScheduler();
|
||||
|
||||
String type = $(NODE_STATE);
|
||||
String labelFilter = $(NODE_LABEL, CommonNodeLabelsManager.ANY).trim();
|
||||
TBODY<TABLE<Hamlet>> tbody =
|
||||
Hamlet.TR<Hamlet.THEAD<TABLE<Hamlet>>> trbody =
|
||||
html.table("#nodes").thead().tr()
|
||||
.th(".nodelabels", "Node Labels")
|
||||
.th(".rack", "Rack")
|
||||
@ -71,13 +76,29 @@ protected void render(Block html) {
|
||||
.th(".nodeaddress", "Node Address")
|
||||
.th(".nodehttpaddress", "Node HTTP Address")
|
||||
.th(".lastHealthUpdate", "Last health-update")
|
||||
.th(".healthReport", "Health-report")
|
||||
.th(".containers", "Containers")
|
||||
.th(".mem", "Mem Used")
|
||||
.th(".mem", "Mem Avail")
|
||||
.th(".vcores", "VCores Used")
|
||||
.th(".vcores", "VCores Avail")
|
||||
.th(".nodeManagerVersion", "Version")._()._().tbody();
|
||||
.th(".healthReport", "Health-report");
|
||||
|
||||
if (!this.opportunisticContainersEnabled) {
|
||||
trbody.th(".containers", "Containers")
|
||||
.th(".mem", "Mem Used")
|
||||
.th(".mem", "Mem Avail")
|
||||
.th(".vcores", "VCores Used")
|
||||
.th(".vcores", "VCores Avail");
|
||||
} else {
|
||||
trbody.th(".containers", "Running Containers (G)")
|
||||
.th(".mem", "Mem Used (G)")
|
||||
.th(".mem", "Mem Avail (G)")
|
||||
.th(".vcores", "VCores Used (G)")
|
||||
.th(".vcores", "VCores Avail (G)")
|
||||
.th(".containers", "Running Containers (O)")
|
||||
.th(".mem", "Mem Used (O)")
|
||||
.th(".vcores", "VCores Used (O)")
|
||||
.th(".containers", "Queued Containers");
|
||||
}
|
||||
|
||||
TBODY<TABLE<Hamlet>> tbody =
|
||||
trbody.th(".nodeManagerVersion", "Version")._()._().tbody();
|
||||
|
||||
NodeState stateFilter = null;
|
||||
if (type != null && !type.isEmpty()) {
|
||||
stateFilter = NodeState.valueOf(StringUtils.toUpperCase(type));
|
||||
@ -153,7 +174,23 @@ protected void render(Block html) {
|
||||
.append("\",\"").append(String.valueOf(info.getUsedVirtualCores()))
|
||||
.append("\",\"")
|
||||
.append(String.valueOf(info.getAvailableVirtualCores()))
|
||||
.append("\",\"").append(ni.getNodeManagerVersion())
|
||||
.append("\",\"");
|
||||
|
||||
// If opportunistic containers are enabled, add extra fields.
|
||||
if (this.opportunisticContainersEnabled) {
|
||||
nodeTableData
|
||||
.append(String.valueOf(info.getNumRunningOpportContainers()))
|
||||
.append("\",\"").append("<br title='")
|
||||
.append(String.valueOf(info.getUsedMemoryOpport())).append("'>")
|
||||
.append(StringUtils.byteDesc(info.getUsedMemoryOpport()))
|
||||
.append("\",\"")
|
||||
.append(String.valueOf(info.getUsedVirtualCoresOpport()))
|
||||
.append("\",\"")
|
||||
.append(String.valueOf(info.getNumQueuedContainers()))
|
||||
.append("\",\"");
|
||||
}
|
||||
|
||||
nodeTableData.append(ni.getNodeManagerVersion())
|
||||
.append("\"],\n");
|
||||
}
|
||||
if (nodeTableData.charAt(nodeTableData.length() - 2) == ',') {
|
||||
|
@ -28,6 +28,7 @@
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.NodeState;
|
||||
import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
|
||||
@ -49,6 +50,10 @@ public class NodeInfo {
|
||||
protected long availMemoryMB;
|
||||
protected long usedVirtualCores;
|
||||
protected long availableVirtualCores;
|
||||
private int numRunningOpportContainers;
|
||||
private long usedMemoryOpport; // Memory in bytes.
|
||||
private long usedVirtualCoresOpport;
|
||||
private int numQueuedContainers;
|
||||
protected ArrayList<String> nodeLabels = new ArrayList<String>();
|
||||
protected ResourceUtilizationInfo resourceUtilization;
|
||||
|
||||
@ -66,7 +71,8 @@ public NodeInfo(RMNode ni, ResourceScheduler sched) {
|
||||
this.usedMemoryMB = report.getUsedResource().getMemorySize();
|
||||
this.availMemoryMB = report.getAvailableResource().getMemorySize();
|
||||
this.usedVirtualCores = report.getUsedResource().getVirtualCores();
|
||||
this.availableVirtualCores = report.getAvailableResource().getVirtualCores();
|
||||
this.availableVirtualCores =
|
||||
report.getAvailableResource().getVirtualCores();
|
||||
}
|
||||
this.id = id.toString();
|
||||
this.rack = ni.getRackName();
|
||||
@ -76,7 +82,22 @@ public NodeInfo(RMNode ni, ResourceScheduler sched) {
|
||||
this.lastHealthUpdate = ni.getLastHealthReportTime();
|
||||
this.healthReport = String.valueOf(ni.getHealthReport());
|
||||
this.version = ni.getNodeManagerVersion();
|
||||
|
||||
|
||||
// Status of opportunistic containers.
|
||||
this.numRunningOpportContainers = 0;
|
||||
this.usedMemoryOpport = 0;
|
||||
this.usedVirtualCoresOpport = 0;
|
||||
this.numQueuedContainers = 0;
|
||||
OpportunisticContainersStatus opportStatus =
|
||||
ni.getOpportunisticContainersStatus();
|
||||
if (opportStatus != null) {
|
||||
this.numRunningOpportContainers =
|
||||
opportStatus.getRunningOpportContainers();
|
||||
this.usedMemoryOpport = opportStatus.getOpportMemoryUsed();
|
||||
this.usedVirtualCoresOpport = opportStatus.getOpportCoresUsed();
|
||||
this.numQueuedContainers = opportStatus.getQueuedOpportContainers();
|
||||
}
|
||||
|
||||
// add labels
|
||||
Set<String> labelSet = ni.getNodeLabels();
|
||||
if (labelSet != null) {
|
||||
@ -140,6 +161,22 @@ public long getAvailableVirtualCores() {
|
||||
return this.availableVirtualCores;
|
||||
}
|
||||
|
||||
public int getNumRunningOpportContainers() {
|
||||
return numRunningOpportContainers;
|
||||
}
|
||||
|
||||
public long getUsedMemoryOpport() {
|
||||
return usedMemoryOpport;
|
||||
}
|
||||
|
||||
public long getUsedVirtualCoresOpport() {
|
||||
return usedVirtualCoresOpport;
|
||||
}
|
||||
|
||||
public int getNumQueuedContainers() {
|
||||
return numQueuedContainers;
|
||||
}
|
||||
|
||||
public ArrayList<String> getNodeLabels() {
|
||||
return this.nodeLabels;
|
||||
}
|
||||
|
@ -35,7 +35,7 @@
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
||||
import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
|
||||
import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
|
||||
|
||||
@ -260,7 +260,7 @@ public ResourceUtilization getNodeUtilization() {
|
||||
return this.nodeUtilization;
|
||||
}
|
||||
|
||||
public QueuedContainersStatus getQueuedContainersStatus() {
|
||||
public OpportunisticContainersStatus getOpportunisticContainersStatus() {
|
||||
return null;
|
||||
}
|
||||
|
||||
|
@ -20,7 +20,7 @@
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
|
||||
import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
|
||||
import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
@ -183,13 +183,13 @@ private RMNode createRMNode(String host, int port,
|
||||
RMNode node1 = Mockito.mock(RMNode.class);
|
||||
NodeId nID1 = new FakeNodeId(host, port);
|
||||
Mockito.when(node1.getNodeID()).thenReturn(nID1);
|
||||
QueuedContainersStatus status1 =
|
||||
Mockito.mock(QueuedContainersStatus.class);
|
||||
OpportunisticContainersStatus status1 =
|
||||
Mockito.mock(OpportunisticContainersStatus.class);
|
||||
Mockito.when(status1.getEstimatedQueueWaitTime())
|
||||
.thenReturn(waitTime);
|
||||
Mockito.when(status1.getWaitQueueLength())
|
||||
.thenReturn(queueLength);
|
||||
Mockito.when(node1.getQueuedContainersStatus()).thenReturn(status1);
|
||||
Mockito.when(node1.getOpportunisticContainersStatus()).thenReturn(status1);
|
||||
return node1;
|
||||
}
|
||||
}
|
||||
|
@ -21,6 +21,7 @@
|
||||
import java.io.PrintWriter;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.NodeState;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.NodesPage.NodesBlock;
|
||||
@ -49,6 +50,7 @@ public class TestNodesPage {
|
||||
// future. In that case this value should be adjusted to the new value.
|
||||
final int numberOfThInMetricsTable = 23;
|
||||
final int numberOfActualTableHeaders = 13;
|
||||
private final int numberOfThForOpportunisticContainers = 4;
|
||||
|
||||
private Injector injector;
|
||||
|
||||
@ -135,4 +137,35 @@ public void testNodesBlockRenderForNodeLabelFilterWithAnyLabel() {
|
||||
Mockito.verify(writer, Mockito.times(numberOfThInMetricsTable))
|
||||
.print("<td");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNodesBlockRenderForOpportunisticContainers() {
|
||||
final RMContext mockRMContext =
|
||||
TestRMWebApp.mockRMContext(3, numberOfRacks, numberOfNodesPerRack,
|
||||
8 * TestRMWebApp.GiB);
|
||||
mockRMContext.getYarnConfiguration().setBoolean(
|
||||
YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true);
|
||||
injector =
|
||||
WebAppTests.createMockInjector(RMContext.class, mockRMContext,
|
||||
new Module() {
|
||||
@Override
|
||||
public void configure(Binder binder) {
|
||||
try {
|
||||
binder.bind(ResourceManager.class).toInstance(
|
||||
TestRMWebApp.mockRm(mockRMContext));
|
||||
} catch (IOException e) {
|
||||
throw new IllegalStateException(e);
|
||||
}
|
||||
}
|
||||
});
|
||||
injector.getInstance(NodesBlock.class).render();
|
||||
PrintWriter writer = injector.getInstance(PrintWriter.class);
|
||||
WebAppTests.flushOutput(injector);
|
||||
|
||||
Mockito.verify(writer, Mockito.times(
|
||||
numberOfActualTableHeaders + numberOfThInMetricsTable +
|
||||
numberOfThForOpportunisticContainers)).print("<th");
|
||||
Mockito.verify(writer, Mockito.times(numberOfThInMetricsTable))
|
||||
.print("<td");
|
||||
}
|
||||
}
|
||||
|
@ -200,6 +200,7 @@ public ConcurrentMap<NodeId, RMNode> getRMNodes() {
|
||||
}
|
||||
};
|
||||
rmContext.setNodeLabelManager(new NullRMNodeLabelsManager());
|
||||
rmContext.setYarnConfiguration(new YarnConfiguration());
|
||||
return rmContext;
|
||||
}
|
||||
|
||||
|
@ -40,6 +40,7 @@
|
||||
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
|
||||
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
|
||||
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
||||
import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
@ -722,13 +723,17 @@ public void verifyNodesXML(NodeList nodes, RMNode nm)
|
||||
"aggregatedContainersPhysicalMemoryMB"),
|
||||
WebServicesTestUtils.getXmlInt(element,
|
||||
"aggregatedContainersVirtualMemoryMB"),
|
||||
WebServicesTestUtils.getXmlFloat(element, "containersCPUUsage"));
|
||||
WebServicesTestUtils.getXmlFloat(element, "containersCPUUsage"),
|
||||
WebServicesTestUtils.getXmlInt(element, "numRunningOpportContainers"),
|
||||
WebServicesTestUtils.getXmlLong(element, "usedMemoryOpport"),
|
||||
WebServicesTestUtils.getXmlInt(element, "usedVirtualCoresOpport"),
|
||||
WebServicesTestUtils.getXmlInt(element, "numQueuedContainers"));
|
||||
}
|
||||
}
|
||||
|
||||
public void verifyNodeInfo(JSONObject nodeInfo, RMNode nm)
|
||||
throws JSONException, Exception {
|
||||
assertEquals("incorrect number of elements", 14, nodeInfo.length());
|
||||
assertEquals("incorrect number of elements", 18, nodeInfo.length());
|
||||
|
||||
JSONObject resourceInfo = nodeInfo.getJSONObject("resourceUtilization");
|
||||
verifyNodeInfoGeneric(nm, nodeInfo.getString("state"),
|
||||
@ -745,21 +750,29 @@ public void verifyNodeInfo(JSONObject nodeInfo, RMNode nm)
|
||||
resourceInfo.getDouble("nodeCPUUsage"),
|
||||
resourceInfo.getInt("aggregatedContainersPhysicalMemoryMB"),
|
||||
resourceInfo.getInt("aggregatedContainersVirtualMemoryMB"),
|
||||
resourceInfo.getDouble("containersCPUUsage"));
|
||||
resourceInfo.getDouble("containersCPUUsage"),
|
||||
nodeInfo.getInt("numRunningOpportContainers"),
|
||||
nodeInfo.getLong("usedMemoryOpport"),
|
||||
nodeInfo.getInt("usedVirtualCoresOpport"),
|
||||
nodeInfo.getInt("numQueuedContainers"));
|
||||
}
|
||||
|
||||
public void verifyNodeInfoGeneric(RMNode node, String state, String rack,
|
||||
String id, String nodeHostName,
|
||||
String nodeHTTPAddress, long lastHealthUpdate, String healthReport,
|
||||
int numContainers, long usedMemoryMB, long availMemoryMB, long usedVirtualCores,
|
||||
long availVirtualCores, String version, int nodePhysicalMemoryMB,
|
||||
int nodeVirtualMemoryMB, double nodeCPUUsage,
|
||||
int numContainers, long usedMemoryMB, long availMemoryMB,
|
||||
long usedVirtualCores, long availVirtualCores, String version,
|
||||
int nodePhysicalMemoryMB, int nodeVirtualMemoryMB, double nodeCPUUsage,
|
||||
int containersPhysicalMemoryMB, int containersVirtualMemoryMB,
|
||||
double containersCPUUsage)
|
||||
double containersCPUUsage, int numRunningOpportContainers,
|
||||
long usedMemoryOpport, int usedVirtualCoresOpport,
|
||||
int numQueuedContainers)
|
||||
throws JSONException, Exception {
|
||||
|
||||
ResourceScheduler sched = rm.getResourceScheduler();
|
||||
SchedulerNodeReport report = sched.getNodeReport(node.getNodeID());
|
||||
OpportunisticContainersStatus opportunisticStatus =
|
||||
node.getOpportunisticContainersStatus();
|
||||
|
||||
WebServicesTestUtils.checkStringMatch("state", node.getState().toString(),
|
||||
state);
|
||||
@ -807,6 +820,20 @@ public void verifyNodeInfoGeneric(RMNode node, String state, String rack,
|
||||
assertEquals("availVirtualCores doesn't match: " + availVirtualCores, report
|
||||
.getAvailableResource().getVirtualCores(), availVirtualCores);
|
||||
}
|
||||
|
||||
if (opportunisticStatus != null) {
|
||||
assertEquals("numRunningOpportContainers doesn't match: " +
|
||||
numRunningOpportContainers,
|
||||
opportunisticStatus.getRunningOpportContainers(),
|
||||
numRunningOpportContainers);
|
||||
assertEquals("usedMemoryOpport doesn't match: " + usedMemoryOpport,
|
||||
opportunisticStatus.getOpportMemoryUsed(), usedMemoryOpport);
|
||||
assertEquals(
|
||||
"usedVirtualCoresOpport doesn't match: " + usedVirtualCoresOpport,
|
||||
opportunisticStatus.getOpportCoresUsed(), usedVirtualCoresOpport);
|
||||
assertEquals("numQueuedContainers doesn't match: " + numQueuedContainers,
|
||||
opportunisticStatus.getQueuedOpportContainers(), numQueuedContainers);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user