YARN-6849. NMContainerStatus should have the Container ExecutionType. (Kartheek Muthyala via asuresh)

This commit is contained in:
Arun Suresh 2017-09-08 09:24:05 -07:00
parent 4a83170be4
commit 1f53ae7972
12 changed files with 178 additions and 9 deletions

View File

@ -21,6 +21,7 @@
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
@ -40,13 +41,14 @@ public static NMContainerStatus newInstance(ContainerId containerId,
long creationTime) {
return newInstance(containerId, version, containerState, allocatedResource,
diagnostics, containerExitStatus, priority, creationTime,
CommonNodeLabelsManager.NO_LABEL);
CommonNodeLabelsManager.NO_LABEL, ExecutionType.GUARANTEED);
}
public static NMContainerStatus newInstance(ContainerId containerId,
int version, ContainerState containerState, Resource allocatedResource,
String diagnostics, int containerExitStatus, Priority priority,
long creationTime, String nodeLabelExpression) {
long creationTime, String nodeLabelExpression,
ExecutionType executionType) {
NMContainerStatus status =
Records.newRecord(NMContainerStatus.class);
status.setContainerId(containerId);
@ -58,6 +60,7 @@ public static NMContainerStatus newInstance(ContainerId containerId,
status.setPriority(priority);
status.setCreationTime(creationTime);
status.setNodeLabelExpression(nodeLabelExpression);
status.setExecutionType(executionType);
return status;
}
@ -134,4 +137,14 @@ public int getVersion() {
public void setVersion(int version) {
}
/**
* Get the <code>ExecutionType</code> of the container.
* @return <code>ExecutionType</code> of the container
*/
public ExecutionType getExecutionType() {
return ExecutionType.GUARANTEED;
}
public void setExecutionType(ExecutionType executionType) { }
}

View File

@ -20,6 +20,7 @@
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
@ -27,6 +28,7 @@
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.proto.YarnProtos;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStateProto;
import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto;
@ -249,6 +251,25 @@ public void setNodeLabelExpression(String nodeLabelExpression) {
builder.setNodeLabelExpression(nodeLabelExpression);
}
@Override
public synchronized ExecutionType getExecutionType() {
NMContainerStatusProtoOrBuilder p = viaProto ? proto : builder;
if (!p.hasExecutionType()) {
return ExecutionType.GUARANTEED;
}
return convertFromProtoFormat(p.getExecutionType());
}
@Override
public synchronized void setExecutionType(ExecutionType executionType) {
maybeInitBuilder();
if (executionType == null) {
builder.clearExecutionType();
return;
}
builder.setExecutionType(convertToProtoFormat(executionType));
}
private void mergeLocalToBuilder() {
if (this.containerId != null
&& !((ContainerIdPBImpl) containerId).getProto().equals(
@ -313,4 +334,13 @@ private PriorityPBImpl convertFromProtoFormat(PriorityProto p) {
private PriorityProto convertToProtoFormat(Priority t) {
return ((PriorityPBImpl)t).getProto();
}
private ExecutionType convertFromProtoFormat(
YarnProtos.ExecutionTypeProto e) {
return ProtoUtils.convertFromProtoFormat(e);
}
private YarnProtos.ExecutionTypeProto convertToProtoFormat(ExecutionType e) {
return ProtoUtils.convertToProtoFormat(e);
}
}

View File

@ -174,6 +174,7 @@ message NMContainerStatusProto {
optional int64 creation_time = 7;
optional string nodeLabelExpression = 8;
optional int32 version = 9;
optional ExecutionTypeProto executionType = 10 [default = GUARANTEED];
}
message SCMUploaderNotifyRequestProto {

View File

@ -632,7 +632,8 @@ public NMContainerStatus getNMContainerStatus() {
getCurrentState(), getResource(), diagnostics.toString(), exitCode,
containerTokenIdentifier.getPriority(),
containerTokenIdentifier.getCreationTime(),
containerTokenIdentifier.getNodeLabelExpression());
containerTokenIdentifier.getNodeLabelExpression(),
containerTokenIdentifier.getExecutionType());
} finally {
this.readLock.unlock();
}

View File

@ -847,7 +847,8 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
containers = startEvent.getNMContainerStatuses();
if (containers != null && !containers.isEmpty()) {
for (NMContainerStatus container : containers) {
if (container.getContainerState() == ContainerState.RUNNING) {
if (container.getContainerState() == ContainerState.RUNNING ||
container.getContainerState() == ContainerState.SCHEDULED) {
rmNode.launchedContainers.add(container.getContainerId());
}
}

View File

@ -529,6 +529,7 @@ private RMContainer recoverAndCreateContainer(NMContainerStatus status,
node.getHttpAddress(), status.getAllocatedResource(),
status.getPriority(), null);
container.setVersion(status.getVersion());
container.setExecutionType(status.getExecutionType());
ApplicationAttemptId attemptId =
container.getId().getApplicationAttemptId();
RMContainer rmContainer = new RMContainerImpl(container,

View File

@ -38,6 +38,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
@ -525,6 +526,9 @@ public void transferStateFromPreviousAppSchedulingInfo(
}
public void recoverContainer(RMContainer rmContainer, String partition) {
if (rmContainer.getExecutionType() != ExecutionType.GUARANTEED) {
return;
}
try {
this.writeLock.lock();
QueueMetrics metrics = queue.getMetrics();

View File

@ -1131,9 +1131,11 @@ public void recoverContainer(SchedulerNode node,
}
LOG.info("SchedulerAttempt " + getApplicationAttemptId()
+ " is recovering container " + rmContainer.getContainerId());
liveContainers.put(rmContainer.getContainerId(), rmContainer);
attemptResourceUsage.incUsed(node.getPartition(),
rmContainer.getContainer().getResource());
addRMContainer(rmContainer.getContainerId(), rmContainer);
if (rmContainer.getExecutionType() == ExecutionType.GUARANTEED) {
attemptResourceUsage.incUsed(node.getPartition(),
rmContainer.getContainer().getResource());
}
// resourceLimit: updated when LeafQueue#recoverContainer#allocateResource
// is called.

View File

@ -1802,7 +1802,9 @@ public void recoverContainer(Resource clusterResource,
if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
return;
}
if (rmContainer.getExecutionType() != ExecutionType.GUARANTEED) {
return;
}
// Careful! Locking order is important!
try {
writeLock.lock();

View File

@ -37,6 +37,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueState;
@ -863,6 +864,9 @@ public void recoverContainer(Resource clusterResource,
if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
return;
}
if (rmContainer.getExecutionType() != ExecutionType.GUARANTEED) {
return;
}
// Careful! Locking order is important!
try {

View File

@ -77,6 +77,7 @@
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
@ -2097,7 +2098,8 @@ public static NMContainerStatus createNMContainerStatus(
NMContainerStatus containerReport =
NMContainerStatus.newInstance(containerId, 0, containerState,
Resource.newInstance(1024, 1), "recover container", 0,
Priority.newInstance(0), 0, nodeLabelExpression);
Priority.newInstance(0), 0, nodeLabelExpression,
ExecutionType.GUARANTEED);
return containerReport;
}

View File

@ -29,8 +29,11 @@
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -44,15 +47,19 @@
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
@ -74,10 +81,14 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService;
import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
@ -2026,6 +2037,103 @@ public void tearDown() {
}
}
@SuppressWarnings("unchecked")
@Test
public void testHandleOpportunisticContainerStatus() throws Exception{
final DrainDispatcher dispatcher = new DrainDispatcher();
YarnConfiguration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED,
true);
rm = new MockRM(conf){
@Override
protected Dispatcher createDispatcher() {
return dispatcher;
}
};
rm.start();
RMApp app = rm.submitApp(1024, true);
ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
.getAppAttemptId();
ResourceTrackerService resourceTrackerService =
rm.getResourceTrackerService();
SchedulerApplicationAttempt applicationAttempt = null;
while (applicationAttempt == null) {
applicationAttempt =
((AbstractYarnScheduler)rm.getRMContext().getScheduler())
.getApplicationAttempt(appAttemptId);
Thread.sleep(100);
}
Resource currentConsumption = applicationAttempt.getCurrentConsumption();
Assert.assertEquals(Resource.newInstance(0, 0), currentConsumption);
Resource allocResources =
applicationAttempt.getQueue().getMetrics().getAllocatedResources();
Assert.assertEquals(Resource.newInstance(0, 0), allocResources);
RegisterNodeManagerRequest req = Records.newRecord(
RegisterNodeManagerRequest.class);
NodeId nodeId = NodeId.newInstance("host2", 1234);
Resource capability = BuilderUtils.newResource(1024, 1);
req.setResource(capability);
req.setNodeId(nodeId);
req.setHttpPort(1234);
req.setNMVersion(YarnVersionInfo.getVersion());
ContainerId c1 = ContainerId.newContainerId(appAttemptId, 1);
ContainerId c2 = ContainerId.newContainerId(appAttemptId, 2);
ContainerId c3 = ContainerId.newContainerId(appAttemptId, 3);
NMContainerStatus queuedOpp =
NMContainerStatus.newInstance(c1, 1, ContainerState.SCHEDULED,
Resource.newInstance(1024, 1), "Dummy Queued OC",
ContainerExitStatus.INVALID, Priority.newInstance(5), 1234, "",
ExecutionType.OPPORTUNISTIC);
NMContainerStatus runningOpp =
NMContainerStatus.newInstance(c2, 1, ContainerState.RUNNING,
Resource.newInstance(2048, 1), "Dummy Running OC",
ContainerExitStatus.INVALID, Priority.newInstance(6), 1234, "",
ExecutionType.OPPORTUNISTIC);
NMContainerStatus runningGuar =
NMContainerStatus.newInstance(c3, 1, ContainerState.RUNNING,
Resource.newInstance(2048, 1), "Dummy Running GC",
ContainerExitStatus.INVALID, Priority.newInstance(6), 1234, "",
ExecutionType.GUARANTEED);
req.setContainerStatuses(Arrays.asList(queuedOpp, runningOpp, runningGuar));
// trying to register a invalid node.
RegisterNodeManagerResponse response =
resourceTrackerService.registerNodeManager(req);
dispatcher.await();
Thread.sleep(2000);
dispatcher.await();
Assert.assertEquals(NodeAction.NORMAL, response.getNodeAction());
Collection<RMContainer> liveContainers = applicationAttempt
.getLiveContainers();
Assert.assertEquals(3, liveContainers.size());
Iterator<RMContainer> iter = liveContainers.iterator();
while (iter.hasNext()) {
RMContainer rc = iter.next();
Assert.assertEquals(
rc.getContainerId().equals(c3) ?
ExecutionType.GUARANTEED : ExecutionType.OPPORTUNISTIC,
rc.getExecutionType());
}
// Should only include GUARANTEED resources
currentConsumption = applicationAttempt.getCurrentConsumption();
Assert.assertEquals(Resource.newInstance(2048, 1), currentConsumption);
allocResources =
applicationAttempt.getQueue().getMetrics().getAllocatedResources();
Assert.assertEquals(Resource.newInstance(2048, 1), allocResources);
SchedulerNode schedulerNode =
rm.getRMContext().getScheduler().getSchedulerNode(nodeId);
Assert.assertNotNull(schedulerNode);
Resource nodeResources = schedulerNode.getAllocatedResource();
Assert.assertEquals(Resource.newInstance(2048, 1), nodeResources);
}
@Test(timeout = 60000)
public void testNodeHeartBeatResponseForUnknownContainerCleanUp()
throws Exception {