YARN-1417. Modified RM to generate container-tokens not at creation time, but at allocation time so as to prevent RM
from shelling out containers with expired tokens. Contributed by Omkar Vinit Joshi and Jian He. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1568060 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
2417ca71d5
commit
d0a5e43de7
@ -269,6 +269,10 @@ Release 2.4.0 - UNRELEASED
|
|||||||
YARN-1578. Fixed reading incomplete application attempt and container data
|
YARN-1578. Fixed reading incomplete application attempt and container data
|
||||||
in FileSystemApplicationHistoryStore. (Shinichi Yamashita via zjshen)
|
in FileSystemApplicationHistoryStore. (Shinichi Yamashita via zjshen)
|
||||||
|
|
||||||
|
YARN-1417. Modified RM to generate container-tokens not at creation time, but
|
||||||
|
at allocation time so as to prevent RM from shelling out containers with
|
||||||
|
expired tokens. (Omkar Vinit Joshi and Jian He via vinodkv)
|
||||||
|
|
||||||
Release 2.3.1 - UNRELEASED
|
Release 2.3.1 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -345,6 +345,11 @@ public synchronized List<Container> pullNewlyAllocatedContainers() {
|
|||||||
for (RMContainer rmContainer : newlyAllocatedContainers) {
|
for (RMContainer rmContainer : newlyAllocatedContainers) {
|
||||||
rmContainer.handle(new RMContainerEvent(rmContainer.getContainerId(),
|
rmContainer.handle(new RMContainerEvent(rmContainer.getContainerId(),
|
||||||
RMContainerEventType.ACQUIRED));
|
RMContainerEventType.ACQUIRED));
|
||||||
|
Container container = rmContainer.getContainer();
|
||||||
|
rmContainer.getContainer().setContainerToken(
|
||||||
|
rmContext.getContainerTokenSecretManager().createContainerToken(
|
||||||
|
rmContainer.getContainerId(), container.getNodeId(), getUser(),
|
||||||
|
container.getResource()));
|
||||||
returnContainerList.add(rmContainer.getContainer());
|
returnContainerList.add(rmContainer.getContainer());
|
||||||
}
|
}
|
||||||
newlyAllocatedContainers.clear();
|
newlyAllocatedContainers.clear();
|
||||||
|
@ -1292,16 +1292,6 @@ Container createContainer(FiCaSchedulerApp application, FiCaSchedulerNode node,
|
|||||||
return container;
|
return container;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Create <code>ContainerToken</code>, only in secure-mode
|
|
||||||
*/
|
|
||||||
Token createContainerToken(
|
|
||||||
FiCaSchedulerApp application, Container container) {
|
|
||||||
return containerTokenSecretManager.createContainerToken(
|
|
||||||
container.getId(), container.getNodeId(),
|
|
||||||
application.getUser(), container.getResource());
|
|
||||||
}
|
|
||||||
|
|
||||||
private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode node,
|
private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode node,
|
||||||
FiCaSchedulerApp application, Priority priority,
|
FiCaSchedulerApp application, Priority priority,
|
||||||
ResourceRequest request, NodeType type, RMContainer rmContainer) {
|
ResourceRequest request, NodeType type, RMContainer rmContainer) {
|
||||||
@ -1345,14 +1335,6 @@ private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode nod
|
|||||||
unreserve(application, priority, node, rmContainer);
|
unreserve(application, priority, node, rmContainer);
|
||||||
}
|
}
|
||||||
|
|
||||||
Token containerToken =
|
|
||||||
createContainerToken(application, container);
|
|
||||||
if (containerToken == null) {
|
|
||||||
// Something went wrong...
|
|
||||||
return Resources.none();
|
|
||||||
}
|
|
||||||
container.setContainerToken(containerToken);
|
|
||||||
|
|
||||||
// Inform the application
|
// Inform the application
|
||||||
RMContainer allocatedContainer =
|
RMContainer allocatedContainer =
|
||||||
application.allocate(type, node, priority, request, container);
|
application.allocate(type, node, priority, request, container);
|
||||||
|
@ -151,17 +151,11 @@ public Container createContainer(
|
|||||||
NodeId nodeId = node.getRMNode().getNodeID();
|
NodeId nodeId = node.getRMNode().getNodeID();
|
||||||
ContainerId containerId = BuilderUtils.newContainerId(application
|
ContainerId containerId = BuilderUtils.newContainerId(application
|
||||||
.getApplicationAttemptId(), application.getNewContainerId());
|
.getApplicationAttemptId(), application.getNewContainerId());
|
||||||
org.apache.hadoop.yarn.api.records.Token containerToken =
|
|
||||||
containerTokenSecretManager.createContainerToken(containerId, nodeId,
|
|
||||||
application.getUser(), capability);
|
|
||||||
if (containerToken == null) {
|
|
||||||
return null; // Try again later.
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create the container
|
// Create the container
|
||||||
Container container =
|
Container container =
|
||||||
BuilderUtils.newContainer(containerId, nodeId, node.getRMNode()
|
BuilderUtils.newContainer(containerId, nodeId, node.getRMNode()
|
||||||
.getHttpAddress(), capability, priority, containerToken);
|
.getHttpAddress(), capability, priority, null);
|
||||||
|
|
||||||
return container;
|
return container;
|
||||||
}
|
}
|
||||||
|
@ -654,20 +654,11 @@ private int assignContainer(FiCaSchedulerNode node, FiCaSchedulerApp application
|
|||||||
NodeId nodeId = node.getRMNode().getNodeID();
|
NodeId nodeId = node.getRMNode().getNodeID();
|
||||||
ContainerId containerId = BuilderUtils.newContainerId(application
|
ContainerId containerId = BuilderUtils.newContainerId(application
|
||||||
.getApplicationAttemptId(), application.getNewContainerId());
|
.getApplicationAttemptId(), application.getNewContainerId());
|
||||||
Token containerToken = null;
|
|
||||||
|
|
||||||
containerToken =
|
|
||||||
this.rmContext.getContainerTokenSecretManager()
|
|
||||||
.createContainerToken(containerId, nodeId, application.getUser(),
|
|
||||||
capability);
|
|
||||||
if (containerToken == null) {
|
|
||||||
return i; // Try again later.
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create the container
|
// Create the container
|
||||||
Container container =
|
Container container =
|
||||||
BuilderUtils.newContainer(containerId, nodeId, node.getRMNode()
|
BuilderUtils.newContainer(containerId, nodeId, node.getRMNode()
|
||||||
.getHttpAddress(), capability, priority, containerToken);
|
.getHttpAddress(), capability, priority, null);
|
||||||
|
|
||||||
// Allocate!
|
// Allocate!
|
||||||
|
|
||||||
|
@ -142,8 +142,15 @@ public void waitForContainerAllocated(MockNM nm, ContainerId containerId)
|
|||||||
public void waitForState(MockNM nm, ContainerId containerId,
|
public void waitForState(MockNM nm, ContainerId containerId,
|
||||||
RMContainerState containerState) throws Exception {
|
RMContainerState containerState) throws Exception {
|
||||||
RMContainer container = getResourceScheduler().getRMContainer(containerId);
|
RMContainer container = getResourceScheduler().getRMContainer(containerId);
|
||||||
Assert.assertNotNull("Container shouldn't be null", container);
|
|
||||||
int timeoutSecs = 0;
|
int timeoutSecs = 0;
|
||||||
|
while(container == null && timeoutSecs++ < 20) {
|
||||||
|
nm.nodeHeartbeat(true);
|
||||||
|
container = getResourceScheduler().getRMContainer(containerId);
|
||||||
|
System.out.println("Waiting for container " + containerId + " to be allocated.");
|
||||||
|
Thread.sleep(100);
|
||||||
|
}
|
||||||
|
Assert.assertNotNull("Container shouldn't be null", container);
|
||||||
|
timeoutSecs = 0;
|
||||||
while (!containerState.equals(container.getState()) && timeoutSecs++ < 40) {
|
while (!containerState.equals(container.getState()) && timeoutSecs++ < 40) {
|
||||||
System.out.println("Container : " + containerId + " State is : "
|
System.out.println("Container : " + containerId + " State is : "
|
||||||
+ container.getState() + " Waiting for state : " + containerState);
|
+ container.getState() + " Waiting for state : " + containerState);
|
||||||
|
@ -18,11 +18,17 @@
|
|||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
import junit.framework.Assert;
|
import junit.framework.Assert;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
||||||
@ -30,6 +36,9 @@
|
|||||||
import org.apache.hadoop.yarn.server.resourcemanager.TestFifoScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.TestFifoScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
|
||||||
@ -106,4 +115,38 @@ public void testExcessReservationThanNodeManagerCapacity() throws Exception {
|
|||||||
|
|
||||||
rm.stop();
|
rm.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// This is to test container tokens are generated when the containers are
|
||||||
|
// acquired by the AM, not when the containers are allocated
|
||||||
|
@Test
|
||||||
|
public void testContainerTokenGeneratedOnPullRequest() throws Exception {
|
||||||
|
YarnConfiguration conf = new YarnConfiguration();
|
||||||
|
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
||||||
|
ResourceScheduler.class);
|
||||||
|
MockRM rm1 = new MockRM(conf);
|
||||||
|
rm1.start();
|
||||||
|
MockNM nm1 = rm1.registerNode("127.0.0.1:1234", 8000);
|
||||||
|
RMApp app1 = rm1.submitApp(200);
|
||||||
|
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||||
|
// request a container.
|
||||||
|
am1.allocate("127.0.0.1", 1024, 1, new ArrayList<ContainerId>());
|
||||||
|
ContainerId containerId2 =
|
||||||
|
ContainerId.newInstance(am1.getApplicationAttemptId(), 2);
|
||||||
|
rm1.waitForState(nm1, containerId2, RMContainerState.ALLOCATED);
|
||||||
|
|
||||||
|
RMContainer container =
|
||||||
|
rm1.getResourceScheduler().getRMContainer(containerId2);
|
||||||
|
// no container token is generated.
|
||||||
|
Assert.assertEquals(containerId2, container.getContainerId());
|
||||||
|
Assert.assertNull(container.getContainer().getContainerToken());
|
||||||
|
|
||||||
|
// acquire the container.
|
||||||
|
List<Container> containers =
|
||||||
|
am1.allocate(new ArrayList<ResourceRequest>(),
|
||||||
|
new ArrayList<ContainerId>()).getAllocatedContainers();
|
||||||
|
Assert.assertEquals(containerId2, containers.get(0).getId());
|
||||||
|
// container token is generated.
|
||||||
|
Assert.assertNotNull(containers.get(0).getContainerToken());
|
||||||
|
rm1.stop();
|
||||||
|
}
|
||||||
}
|
}
|
Loading…
Reference in New Issue
Block a user