YARN-5171. Extend DistributedSchedulerProtocol to notify RM of containers allocated by the Node. (Inigo Goiri via asuresh)

This commit is contained in:
Arun Suresh 2016-06-22 19:04:54 -07:00
parent 79a7289165
commit 99e5dd68d0
19 changed files with 572 additions and 47 deletions

View File

@ -47,12 +47,15 @@
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.After;
import org.junit.Assert;
@ -64,10 +67,14 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -166,8 +173,18 @@ public void testOpportunisticExecutionTypeRequestE2E() throws Exception {
Assert.assertNotNull(responseRegister.getSchedulerResourceTypes());
Assert.assertNotNull(responseRegister.getMaximumResourceCapability());
RMApp rmApp =
cluster.getResourceManager().getRMContext().getRMApps().get(appId);
// Wait until the RM has been updated and verify
Map<ApplicationId, RMApp> rmApps =
cluster.getResourceManager().getRMContext().getRMApps();
boolean rmUpdated = false;
for (int i=0; i<10 && !rmUpdated; i++) {
sleep(100);
RMApp rmApp = rmApps.get(appId);
if (rmApp.getState() == RMAppState.RUNNING) {
rmUpdated = true;
}
}
RMApp rmApp = rmApps.get(appId);
Assert.assertEquals(RMAppState.RUNNING, rmApp.getState());
LOG.info("testDistributedSchedulingE2E - Allocate");
@ -207,6 +224,17 @@ public void testOpportunisticExecutionTypeRequestE2E() throws Exception {
containerTokenIdentifier.getExecutionType());
}
// Check that the RM sees OPPORTUNISTIC containers
ResourceScheduler scheduler = cluster.getResourceManager()
.getResourceScheduler();
for (Container allocatedContainer : allocResponse
.getAllocatedContainers()) {
ContainerId containerId = allocatedContainer.getId();
RMContainer rmContainer = scheduler.getRMContainer(containerId);
Assert.assertEquals(ExecutionType.OPPORTUNISTIC,
rmContainer.getExecutionType());
}
LOG.info("testDistributedSchedulingE2E - Finish");
}
@ -512,6 +540,97 @@ public AllocateResponse answer(InvocationOnMock invocation)
}
}
/**
* Check if an AM can ask for opportunistic containers and get them.
* @throws Exception
*/
@Test
public void testAMOpportunistic() throws Exception {
// Basic container to request
Resource capability = Resource.newInstance(1024, 1);
Priority priority = Priority.newInstance(1);
// Get the cluster topology
List<NodeReport> nodeReports = rmClient.getNodeReports(NodeState.RUNNING);
String node = nodeReports.get(0).getNodeId().getHost();
String rack = nodeReports.get(0).getRackName();
String[] nodes = new String[]{node};
String[] racks = new String[]{rack};
// Create an AM to request resources
AMRMClient<AMRMClient.ContainerRequest> amClient = null;
try {
amClient = new AMRMClientImpl<AMRMClient.ContainerRequest>(client);
amClient.init(yarnConf);
amClient.start();
amClient.registerApplicationMaster(NetUtils.getHostname(), 1024, "");
// AM requests an opportunistic container
ExecutionTypeRequest execTypeRequest =
ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC, true);
ContainerRequest containerRequest = new AMRMClient.ContainerRequest(
capability, nodes, racks, priority, true, null, execTypeRequest);
amClient.addContainerRequest(containerRequest);
// Wait until the container is allocated
ContainerId opportunisticContainerId = null;
for (int i=0; i<10 && opportunisticContainerId == null; i++) {
AllocateResponse allocResponse = amClient.allocate(0.1f);
List<Container> allocatedContainers =
allocResponse.getAllocatedContainers();
for (Container allocatedContainer : allocatedContainers) {
// Check that this is the container we required
assertEquals(ExecutionType.OPPORTUNISTIC,
allocatedContainer.getExecutionType());
opportunisticContainerId = allocatedContainer.getId();
}
sleep(100);
}
assertNotNull(opportunisticContainerId);
// The RM sees the container as OPPORTUNISTIC
ResourceScheduler scheduler = cluster.getResourceManager()
.getResourceScheduler();
RMContainer rmContainer = scheduler.getRMContainer(
opportunisticContainerId);
assertEquals(ExecutionType.OPPORTUNISTIC,
rmContainer.getExecutionType());
// Release the opportunistic container
amClient.releaseAssignedContainer(opportunisticContainerId);
// Wait for the release container to appear
boolean released = false;
for (int i=0; i<10 && !released; i++) {
AllocateResponse allocResponse = amClient.allocate(0.1f);
List<ContainerStatus> completedContainers =
allocResponse.getCompletedContainersStatuses();
for (ContainerStatus completedContainer : completedContainers) {
ContainerId completedContainerId =
completedContainer.getContainerId();
assertEquals(completedContainerId, opportunisticContainerId);
released = true;
}
if (!released) {
sleep(100);
}
}
assertTrue(released);
// The RM shouldn't see the container anymore
rmContainer = scheduler.getRMContainer(opportunisticContainerId);
assertNull(rmContainer);
// Clean the AM
amClient.unregisterApplicationMaster(
FinalApplicationStatus.SUCCEEDED, null, null);
} finally {
if (amClient != null &&
amClient.getServiceState() == Service.STATE.STARTED) {
amClient.close();
}
}
}
private void sleep(int sleepTime) {
try {
Thread.sleep(sleepTime);

View File

@ -26,6 +26,7 @@
import org.apache.hadoop.yarn.api.records.AMCommand;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
@ -339,4 +340,17 @@ public static ExecutionTypeRequest convertFromProtoFormat(
ExecutionTypeRequestProto e) {
return new ExecutionTypeRequestPBImpl(e);
}
/*
* Container
*/
public static YarnProtos.ContainerProto convertToProtoFormat(
Container t) {
return ((ContainerPBImpl)t).getProto();
}
public static ContainerStatusPBImpl convertFromProtoFormat(
YarnProtos.ContainerStatusProto p) {
return new ContainerStatusPBImpl(p);
}
}

View File

@ -22,7 +22,7 @@
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.io.retry.Idempotent;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
@ -74,5 +74,5 @@ DistSchedRegisterResponse registerApplicationMasterForDistributedScheduling(
@Unstable
@Idempotent
DistSchedAllocateResponse allocateForDistributedScheduling(
AllocateRequest request) throws YarnException, IOException;
DistSchedAllocateRequest request) throws YarnException, IOException;
}

View File

@ -22,9 +22,11 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos;
import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
@ -35,6 +37,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedAllocateRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedAllocateResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedRegisterResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
@ -77,9 +80,9 @@ public void close() {
@Override
public DistSchedRegisterResponse
registerApplicationMasterForDistributedScheduling
(RegisterApplicationMasterRequest request) throws YarnException,
IOException {
registerApplicationMasterForDistributedScheduling(
RegisterApplicationMasterRequest request)
throws YarnException, IOException {
YarnServiceProtos.RegisterApplicationMasterRequestProto requestProto =
((RegisterApplicationMasterRequestPBImpl) request).getProto();
try {
@ -93,10 +96,10 @@ public void close() {
}
@Override
public DistSchedAllocateResponse allocateForDistributedScheduling
(AllocateRequest request) throws YarnException, IOException {
YarnServiceProtos.AllocateRequestProto requestProto =
((AllocateRequestPBImpl) request).getProto();
public DistSchedAllocateResponse allocateForDistributedScheduling(
DistSchedAllocateRequest request) throws YarnException, IOException {
YarnServerCommonServiceProtos.DistSchedAllocateRequestProto requestProto =
((DistSchedAllocateRequestPBImpl) request).getProto();
try {
return new DistSchedAllocateResponsePBImpl(
proxy.allocateForDistributedScheduling(null, requestProto));

View File

@ -31,6 +31,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedAllocateRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedAllocateResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedRegisterResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
@ -77,8 +78,10 @@ public DistributedSchedulerProtocolPBServiceImpl(
@Override
public YarnServerCommonServiceProtos.DistSchedAllocateResponseProto
allocateForDistributedScheduling(RpcController controller,
AllocateRequestProto proto) throws ServiceException {
AllocateRequestPBImpl request = new AllocateRequestPBImpl(proto);
YarnServerCommonServiceProtos.DistSchedAllocateRequestProto proto)
throws ServiceException {
DistSchedAllocateRequestPBImpl request =
new DistSchedAllocateRequestPBImpl(proto);
try {
DistSchedAllocateResponse response = real
.allocateForDistributedScheduling(request);

View File

@ -0,0 +1,69 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.api.protocolrecords;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.records.Container;
import java.util.List;
/**
* Request for a distributed scheduler to notify allocation of containers to
* the Resource Manager.
*/
@Public
@Evolving
public abstract class DistSchedAllocateRequest {
/**
* Get the underlying <code>AllocateRequest</code> object.
* @return Allocate request
*/
@Public
@Evolving
public abstract AllocateRequest getAllocateRequest();
/**
* Set the underlying <code>AllocateRequest</code> object.
* @param allocateRequest Allocate request
*/
@Public
@Evolving
public abstract void setAllocateRequest(AllocateRequest allocateRequest);
/**
* Get the list of <em>newly allocated</em> <code>Container</code> by the
* Distributed Scheduling component on the NodeManager.
* @return list of <em>newly allocated</em> <code>Container</code>
*/
@Public
@Evolving
public abstract List<Container> getAllocatedContainers();
/**
* Set the list of <em>newly allocated</em> <code>Container</code> by the
* Distributed Scheduling component on the NodeManager.
* @param containers list of <em>newly allocated</em> <code>Container</code>
*/
@Public
@Evolving
public abstract void setAllocatedContainers(List<Container> containers);
}

View File

@ -0,0 +1,185 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
import java.util.ArrayList;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.DistSchedAllocateRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.DistSchedAllocateRequestProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateRequestProto;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest;
import java.util.Iterator;
import java.util.List;
/**
* Implementation of {@link DistSchedAllocateRequest} for a distributed
* scheduler to notify about the allocation of containers to the Resource
* Manager.
*/
public class DistSchedAllocateRequestPBImpl extends DistSchedAllocateRequest {
private DistSchedAllocateRequestProto.Builder builder = null;
private boolean viaProto = false;
private DistSchedAllocateRequestProto proto;
private AllocateRequest allocateRequest;
private List<Container> containers;
public DistSchedAllocateRequestPBImpl() {
builder = DistSchedAllocateRequestProto.newBuilder();
}
public DistSchedAllocateRequestPBImpl(DistSchedAllocateRequestProto proto) {
this.proto = proto;
this.viaProto = true;
}
@Override
public AllocateRequest getAllocateRequest() {
DistSchedAllocateRequestProtoOrBuilder p = viaProto ? proto : builder;
if (this.allocateRequest != null) {
return this.allocateRequest;
}
if (!p.hasAllocateRequest()) {
return null;
}
this.allocateRequest = convertFromProtoFormat(p.getAllocateRequest());
return this.allocateRequest;
}
@Override
public void setAllocateRequest(AllocateRequest pAllocateRequest) {
maybeInitBuilder();
if (allocateRequest == null) {
builder.clearAllocateRequest();
}
this.allocateRequest = pAllocateRequest;
}
@Override
public List<Container> getAllocatedContainers() {
if (this.containers != null) {
return this.containers;
}
initAllocatedContainers();
return containers;
}
private void initAllocatedContainers() {
DistSchedAllocateRequestProtoOrBuilder p = viaProto ? proto : builder;
List<ContainerProto> list = p.getAllocatedContainersList();
this.containers = new ArrayList<Container>();
for (ContainerProto c : list) {
this.containers.add(convertFromProtoFormat(c));
}
}
@Override
public void setAllocatedContainers(List<Container> pContainers) {
maybeInitBuilder();
if (pContainers == null || pContainers.isEmpty()) {
if (this.containers != null) {
this.containers.clear();
}
builder.clearAllocatedContainers();
return;
}
this.containers = new ArrayList<>();
this.containers.addAll(pContainers);
}
public DistSchedAllocateRequestProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = DistSchedAllocateRequestProto.newBuilder(proto);
}
viaProto = false;
}
private void mergeLocalToProto() {
if (viaProto) {
maybeInitBuilder();
}
mergeLocalToBuilder();
proto = builder.build();
viaProto = true;
}
private void mergeLocalToBuilder() {
if (this.containers != null) {
builder.clearAllocatedContainers();
Iterable<ContainerProto> iterable =
getContainerProtoIterable(this.containers);
builder.addAllAllocatedContainers(iterable);
}
if (this.allocateRequest != null) {
builder.setAllocateRequest(
((AllocateRequestPBImpl)this.allocateRequest).getProto());
}
}
private Iterable<ContainerProto> getContainerProtoIterable(
final List<Container> newContainersList) {
maybeInitBuilder();
return new Iterable<ContainerProto>() {
@Override
public synchronized Iterator<ContainerProto> iterator() {
return new Iterator<ContainerProto>() {
Iterator<Container> iter = newContainersList.iterator();
@Override
public synchronized boolean hasNext() {
return iter.hasNext();
}
@Override
public synchronized ContainerProto next() {
return ProtoUtils.convertToProtoFormat(iter.next());
}
@Override
public synchronized void remove() {
throw new UnsupportedOperationException();
}
};
}
};
}
private ContainerPBImpl convertFromProtoFormat(ContainerProto p) {
return new ContainerPBImpl(p);
}
private AllocateRequestPBImpl convertFromProtoFormat(AllocateRequestProto p) {
return new AllocateRequestPBImpl(p);
}
}

View File

@ -35,5 +35,5 @@ import "yarn_server_common_service_protos.proto";
service DistributedSchedulerProtocolService {
rpc registerApplicationMasterForDistributedScheduling (RegisterApplicationMasterRequestProto) returns (DistSchedRegisterResponseProto);
rpc finishApplicationMaster (FinishApplicationMasterRequestProto) returns (FinishApplicationMasterResponseProto);
rpc allocateForDistributedScheduling (AllocateRequestProto) returns (DistSchedAllocateResponseProto);
rpc allocateForDistributedScheduling (DistSchedAllocateRequestProto) returns (DistSchedAllocateResponseProto);
}

View File

@ -41,6 +41,11 @@ message DistSchedAllocateResponseProto {
repeated NodeIdProto nodes_for_scheduling = 2;
}
message DistSchedAllocateRequestProto {
optional AllocateRequestProto allocate_request = 1;
repeated ContainerProto allocated_containers = 2;
}
message NodeLabelsProto {
repeated NodeLabelProto nodeLabels = 1;
}

View File

@ -21,10 +21,10 @@
import org.apache.hadoop.conf.Configuration;
import com.google.common.base.Preconditions;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords
.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
@ -118,8 +118,8 @@ public AMRMProxyApplicationContext getApplicationContext() {
* @throws IOException
*/
@Override
public DistSchedAllocateResponse allocateForDistributedScheduling
(AllocateRequest request) throws YarnException, IOException {
public DistSchedAllocateResponse allocateForDistributedScheduling(
DistSchedAllocateRequest request) throws YarnException, IOException {
return (this.nextInterceptor != null) ?
this.nextInterceptor.allocateForDistributedScheduling(request) : null;
}
@ -135,9 +135,9 @@ public AMRMProxyApplicationContext getApplicationContext() {
*/
@Override
public DistSchedRegisterResponse
registerApplicationMasterForDistributedScheduling
(RegisterApplicationMasterRequest request) throws YarnException,
IOException {
registerApplicationMasterForDistributedScheduling(
RegisterApplicationMasterRequest request)
throws YarnException, IOException {
return (this.nextInterceptor != null) ? this.nextInterceptor
.registerApplicationMasterForDistributedScheduling(request) : null;
}

View File

@ -47,6 +47,7 @@
import org.apache.hadoop.yarn.server.api.ServerRMProxy;
import org.apache.hadoop.yarn.server.api.protocolrecords
.DistSchedAllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -133,8 +134,8 @@ public AllocateResponse allocate(final AllocateRequest request)
}
@Override
public DistSchedAllocateResponse allocateForDistributedScheduling
(AllocateRequest request) throws YarnException, IOException {
public DistSchedAllocateResponse allocateForDistributedScheduling(
DistSchedAllocateRequest request) throws YarnException, IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Forwarding allocateForDistributedScheduling request" +
"to the real YARN RM");
@ -212,9 +213,9 @@ public AllocateResponse allocate(AllocateRequest request) throws
}
@Override
public DistSchedAllocateResponse
allocateForDistributedScheduling(AllocateRequest request) throws
YarnException, IOException {
public DistSchedAllocateResponse allocateForDistributedScheduling(
DistSchedAllocateRequest request)
throws YarnException, IOException {
throw new IOException("Not Supported !!");
}
};

View File

@ -21,6 +21,9 @@
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
import org.apache.hadoop.yarn.api.protocolrecords
@ -99,6 +102,9 @@ static class DistSchedulerParams {
private static final Logger LOG = LoggerFactory
.getLogger(LocalScheduler.class);
private final static RecordFactory RECORD_FACTORY =
RecordFactoryProvider.getRecordFactory(null);
// Currently just used to keep track of allocated Containers
// Can be used for reporting stats later
private Set<ContainerId> containersAllocated = new HashSet<>();
@ -176,7 +182,10 @@ void initLocal(ApplicationAttemptId applicationAttemptId,
@Override
public AllocateResponse allocate(AllocateRequest request) throws
YarnException, IOException {
return allocateForDistributedScheduling(request).getAllocateResponse();
DistSchedAllocateRequest distRequest =
RECORD_FACTORY.newRecordInstance(DistSchedAllocateRequest.class);
distRequest.setAllocateRequest(request);
return allocateForDistributedScheduling(distRequest).getAllocateResponse();
}
@Override
@ -324,9 +333,9 @@ private void addToNodeList(List<NodeId> nodes) {
@Override
public DistSchedRegisterResponse
registerApplicationMasterForDistributedScheduling
(RegisterApplicationMasterRequest request) throws YarnException,
IOException {
registerApplicationMasterForDistributedScheduling(
RegisterApplicationMasterRequest request)
throws YarnException, IOException {
LOG.info("Forwarding registration request to the" +
"Distributed Scheduler Service on YARN RM");
DistSchedRegisterResponse dsResp = getNextInterceptor()
@ -336,17 +345,18 @@ private void addToNodeList(List<NodeId> nodes) {
}
@Override
public DistSchedAllocateResponse allocateForDistributedScheduling
(AllocateRequest request) throws YarnException, IOException {
public DistSchedAllocateResponse allocateForDistributedScheduling(
DistSchedAllocateRequest request) throws YarnException, IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Forwarding allocate request to the" +
"Distributed Scheduler Service on YARN RM");
}
// Partition requests into GUARANTEED and OPPORTUNISTIC reqs
PartitionedResourceRequests partitionedAsks = partitionAskList(request
.getAskList());
PartitionedResourceRequests partitionedAsks = partitionAskList(
request.getAllocateRequest().getAskList());
List<ContainerId> releasedContainers = request.getReleaseList();
List<ContainerId> releasedContainers =
request.getAllocateRequest().getReleaseList();
int numReleasedContainers = releasedContainers.size();
if (numReleasedContainers > 0) {
LOG.info("AttemptID: " + applicationAttemptId + " released: "
@ -355,7 +365,8 @@ private void addToNodeList(List<NodeId> nodes) {
}
// Also, update black list
ResourceBlacklistRequest rbr = request.getResourceBlacklistRequest();
ResourceBlacklistRequest rbr =
request.getAllocateRequest().getResourceBlacklistRequest();
if (rbr != null) {
blacklist.removeAll(rbr.getBlacklistRemovals());
blacklist.addAll(rbr.getBlacklistAdditions());
@ -381,9 +392,10 @@ private void addToNodeList(List<NodeId> nodes) {
allocatedContainers.addAll(e.getValue());
}
}
request.setAllocatedContainers(allocatedContainers);
// Send all the GUARANTEED Reqs to RM
request.setAskList(partitionedAsks.getGuaranteed());
request.getAllocateRequest().setAskList(partitionedAsks.getGuaranteed());
DistSchedAllocateResponse dsResp =
getNextInterceptor().allocateForDistributedScheduling(request);

View File

@ -23,6 +23,7 @@
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
import org.apache.hadoop.yarn.api.protocolrecords
@ -126,7 +127,7 @@ public void setBytes(ByteBuffer bytes) {}
Mockito.when(
finalReqIntcptr.allocateForDistributedScheduling(
Mockito.any(AllocateRequest.class)))
Mockito.any(DistSchedAllocateRequest.class)))
.thenAnswer(new Answer<DistSchedAllocateResponse>() {
@Override
public DistSchedAllocateResponse answer(InvocationOnMock

View File

@ -24,6 +24,7 @@
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol;
@ -32,6 +33,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
@ -45,6 +47,12 @@
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.proto.ApplicationMasterProtocol.ApplicationMasterProtocolService;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.NodeQueueLoadMonitor;
@ -60,6 +68,7 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@ -229,9 +238,23 @@ public AllocateResponse allocate(AllocateRequest request) throws
}
@Override
public DistSchedAllocateResponse allocateForDistributedScheduling
(AllocateRequest request) throws YarnException, IOException {
AllocateResponse response = allocate(request);
public DistSchedAllocateResponse allocateForDistributedScheduling(
DistSchedAllocateRequest request) throws YarnException, IOException {
List<Container> distAllocContainers = request.getAllocatedContainers();
for (Container container : distAllocContainers) {
// Create RMContainer
SchedulerApplicationAttempt appAttempt =
((AbstractYarnScheduler) rmContext.getScheduler())
.getCurrentAttemptForContainer(container.getId());
RMContainer rmContainer = new RMContainerImpl(container,
appAttempt.getApplicationAttemptId(), container.getNodeId(),
appAttempt.getUser(), rmContext, true);
appAttempt.addRMContainer(container.getId(), rmContainer);
rmContainer.handle(
new RMContainerEvent(container.getId(),
RMContainerEventType.LAUNCHED));
}
AllocateResponse response = allocate(request.getAllocateRequest());
DistSchedAllocateResponse dsResp = recordFactory.newRecordInstance
(DistSchedAllocateResponse.class);
dsResp.setAllocateResponse(response);

View File

@ -25,6 +25,7 @@
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerReport;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
@ -91,4 +92,14 @@ public interface RMContainer extends EventHandler<RMContainerEvent> {
void cancelIncreaseReservation();
String getQueueName();
ExecutionType getExecutionType();
/**
* If the container was allocated by a container other than the Resource
* Manager (e.g., the distributed scheduler in the NM
* <code>LocalScheduler</code>).
* @return If the container was allocated remotely.
*/
boolean isRemotelyAllocated();
}

View File

@ -35,6 +35,7 @@
import org.apache.hadoop.yarn.api.records.ContainerReport;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
@ -79,6 +80,8 @@ RMContainerEventType.START, new ContainerStartedTransition())
RMContainerEventType.KILL)
.addTransition(RMContainerState.NEW, RMContainerState.RESERVED,
RMContainerEventType.RESERVED, new ContainerReservedTransition())
.addTransition(RMContainerState.NEW, RMContainerState.RUNNING,
RMContainerEventType.LAUNCHED)
.addTransition(RMContainerState.NEW,
EnumSet.of(RMContainerState.RUNNING, RMContainerState.COMPLETED),
RMContainerEventType.RECOVER, new ContainerRecoveredTransition())
@ -183,6 +186,8 @@ RMContainerEventType.CHANGE_RESOURCE, new ChangeResourceTransition())
private Resource lastConfirmedResource;
private volatile String queueName;
private boolean isExternallyAllocated;
public RMContainerImpl(Container container,
ApplicationAttemptId appAttemptId, NodeId nodeId, String user,
RMContext rmContext) {
@ -190,6 +195,13 @@ public RMContainerImpl(Container container,
.currentTimeMillis(), "");
}
public RMContainerImpl(Container container,
ApplicationAttemptId appAttemptId, NodeId nodeId, String user,
RMContext rmContext, boolean isExternallyAllocated) {
this(container, appAttemptId, nodeId, user, rmContext, System
.currentTimeMillis(), "", isExternallyAllocated);
}
private boolean saveNonAMContainerMetaInfo;
public RMContainerImpl(Container container,
@ -202,6 +214,14 @@ public RMContainerImpl(Container container,
public RMContainerImpl(Container container,
ApplicationAttemptId appAttemptId, NodeId nodeId, String user,
RMContext rmContext, long creationTime, String nodeLabelExpression) {
this(container, appAttemptId, nodeId, user, rmContext, creationTime,
nodeLabelExpression, false);
}
public RMContainerImpl(Container container,
ApplicationAttemptId appAttemptId, NodeId nodeId, String user,
RMContext rmContext, long creationTime, String nodeLabelExpression,
boolean isExternallyAllocated) {
this.stateMachine = stateMachineFactory.make(this);
this.containerId = container.getId();
this.nodeId = nodeId;
@ -216,6 +236,7 @@ public RMContainerImpl(Container container,
this.resourceRequests = null;
this.nodeLabelExpression = nodeLabelExpression;
this.lastConfirmedResource = container.getResource();
this.isExternallyAllocated = isExternallyAllocated;
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
this.readLock = lock.readLock();
@ -827,4 +848,14 @@ public void setQueueName(String queueName) {
public String getQueueName() {
return queueName;
}
@Override
public ExecutionType getExecutionType() {
return container.getExecutionType();
}
@Override
public boolean isRemotelyAllocated() {
return isExternallyAllocated;
}
}

View File

@ -516,7 +516,23 @@ public void completedContainer(RMContainer rmContainer,
return;
}
completedContainerInternal(rmContainer, containerStatus, event);
if (!rmContainer.isRemotelyAllocated()) {
completedContainerInternal(rmContainer, containerStatus, event);
} else {
ContainerId containerId = rmContainer.getContainerId();
// Inform the container
rmContainer.handle(
new RMContainerFinishedEvent(containerId, containerStatus, event));
SchedulerApplicationAttempt schedulerAttempt =
getCurrentAttemptForContainer(containerId);
if (schedulerAttempt != null) {
schedulerAttempt.removeRMContainer(containerId);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Completed container: " + rmContainer.getContainerId() +
" in state: " + rmContainer.getState() + " event:" + event);
}
}
// If the container is getting killed in ACQUIRED state, the requester (AM
// for regular containers and RM itself for AM container) will not know what

View File

@ -112,6 +112,9 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
private boolean isAttemptRecovering;
protected ResourceUsage attemptResourceUsage = new ResourceUsage();
/** Scheduled by a remote scheduler. */
protected ResourceUsage attemptResourceUsageAllocatedRemotely =
new ResourceUsage();
private AtomicLong firstAllocationRequestSentTime = new AtomicLong(0);
private AtomicLong firstContainerAllocatedTime = new AtomicLong(0);
@ -288,6 +291,23 @@ public synchronized RMContainer getRMContainer(ContainerId id) {
return liveContainers.get(id);
}
public synchronized void addRMContainer(
ContainerId id, RMContainer rmContainer) {
liveContainers.put(id, rmContainer);
if (rmContainer.isRemotelyAllocated()) {
this.attemptResourceUsageAllocatedRemotely.incUsed(
rmContainer.getAllocatedResource());
}
}
public synchronized void removeRMContainer(ContainerId containerId) {
RMContainer rmContainer = liveContainers.remove(containerId);
if (rmContainer != null && rmContainer.isRemotelyAllocated()) {
this.attemptResourceUsageAllocatedRemotely.decUsed(
rmContainer.getAllocatedResource());
}
}
protected synchronized void resetReReservations(Priority priority) {
reReservations.setCount(priority, 0);
}

View File

@ -44,15 +44,17 @@
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocolPB;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocolPB;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedAllocateRequestPBImpl;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -175,10 +177,15 @@ public Configuration getYarnConfiguration() {
Assert.assertEquals(2,
dsRegResp.getIncrAllocatableCapabilty().getVirtualCores());
DistSchedAllocateRequestPBImpl distAllReq =
(DistSchedAllocateRequestPBImpl)factory.newRecordInstance(
DistSchedAllocateRequest.class);
distAllReq.setAllocateRequest(allReq);
distAllReq.setAllocatedContainers(Arrays.asList(c));
DistSchedAllocateResponse dsAllocResp =
new DistSchedAllocateResponsePBImpl(
dsProxy.allocateForDistributedScheduling(null,
((AllocateRequestPBImpl)allReq).getProto()));
distAllReq.getProto()));
Assert.assertEquals(
"h1", dsAllocResp.getNodesForScheduling().get(0).getHost());
@ -243,8 +250,13 @@ public AllocateResponse allocate(AllocateRequest request) throws
@Override
public DistSchedAllocateResponse allocateForDistributedScheduling(
AllocateRequest request) throws YarnException, IOException {
List<ResourceRequest> askList = request.getAskList();
DistSchedAllocateRequest request) throws YarnException, IOException {
List<ResourceRequest> askList =
request.getAllocateRequest().getAskList();
List<Container> allocatedContainers = request.getAllocatedContainers();
Assert.assertEquals(1, allocatedContainers.size());
Assert.assertEquals(ExecutionType.OPPORTUNISTIC,
allocatedContainers.get(0).getExecutionType());
Assert.assertEquals(1, askList.size());
Assert.assertTrue(askList.get(0)
.getExecutionTypeRequest().getEnforceExecutionType());