From ffd820c27a4f8cf4676ad8758696ed89fde80218 Mon Sep 17 00:00:00 2001 From: Jian He Date: Tue, 21 Jul 2015 16:10:40 -0700 Subject: [PATCH] YARN-1645. ContainerManager implementation to support container resizing. Contributed by Meng Ding & Wangda Tan --- hadoop-yarn-project/CHANGES.txt | 3 + .../CMgrDecreaseContainersResourceEvent.java | 37 ++++ .../ContainerManagerEventType.java | 1 + .../ContainerManagerImpl.java | 180 +++++++++++++++-- .../ChangeContainerResourceEvent.java | 36 ++++ .../container/ContainerEventType.java | 4 + .../nodemanager/DummyContainerManager.java | 6 +- .../TestContainerManagerWithLCE.java | 22 ++ .../BaseContainerManagerTest.java | 43 +++- .../TestContainerManager.java | 190 +++++++++++++++++- 10 files changed, 486 insertions(+), 36 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/CMgrDecreaseContainersResourceEvent.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ChangeContainerResourceEvent.java diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index bf6d9c4154..346fe85e91 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -206,6 +206,9 @@ Release 2.8.0 - UNRELEASED YARN-1449. AM-NM protocol changes to support container resizing. (Meng Ding & Wangda Tan via jianhe) + YARN-1645. ContainerManager implementation to support container resizing. + (Meng Ding & Wangda Tan via jianhe) + IMPROVEMENTS YARN-644. Basic null check is not performed on passed in arguments before diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/CMgrDecreaseContainersResourceEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/CMgrDecreaseContainersResourceEvent.java new file mode 100644 index 0000000000..9479d0bcdd --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/CMgrDecreaseContainersResourceEvent.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager; + +import org.apache.hadoop.yarn.api.records.Container; +import java.util.List; + +public class CMgrDecreaseContainersResourceEvent extends ContainerManagerEvent { + + private final List containersToDecrease; + + public CMgrDecreaseContainersResourceEvent(List + containersToDecrease) { + super(ContainerManagerEventType.DECREASE_CONTAINERS_RESOURCE); + this.containersToDecrease = containersToDecrease; + } + + public List getContainersToDecrease() { + return this.containersToDecrease; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerManagerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerManagerEventType.java index 4278ce0e92..fcb0252217 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerManagerEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerManagerEventType.java @@ -21,4 +21,5 @@ public enum ContainerManagerEventType { FINISH_APPS, FINISH_CONTAINERS, + DECREASE_CONTAINERS_RESOURCE } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index ba1aec2721..890a4e436b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -74,6 +74,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.LogAggregationContext; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.SerializedException; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.LogAggregationContextPBImpl; @@ -95,6 +96,7 @@ import org.apache.hadoop.yarn.server.api.ContainerType; import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent; import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedContainersEvent; +import org.apache.hadoop.yarn.server.nodemanager.CMgrDecreaseContainersResourceEvent; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.ContainerManagerEvent; import org.apache.hadoop.yarn.server.nodemanager.Context; @@ -113,6 +115,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationInitEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ChangeContainerResourceEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl; @@ -141,6 +144,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.ByteString; +import org.apache.hadoop.yarn.util.resource.Resources; public class ContainerManagerImpl extends CompositeService implements ServiceStateChangeListener, ContainerManagementProtocol, @@ -681,33 +685,45 @@ protected void authorizeUser(UserGroupInformation remoteUgi, /** * @param containerTokenIdentifier - * of the container to be started + * of the container whose resource is to be started or increased * @throws YarnException */ @Private @VisibleForTesting - protected void authorizeStartRequest(NMTokenIdentifier nmTokenIdentifier, - ContainerTokenIdentifier containerTokenIdentifier) throws YarnException { + protected void authorizeStartAndResourceIncreaseRequest( + NMTokenIdentifier nmTokenIdentifier, + ContainerTokenIdentifier containerTokenIdentifier, + boolean startRequest) + throws YarnException { if (nmTokenIdentifier == null) { throw RPCUtil.getRemoteException(INVALID_NMTOKEN_MSG); } if (containerTokenIdentifier == null) { throw RPCUtil.getRemoteException(INVALID_CONTAINERTOKEN_MSG); } + /* + * Check the following: + * 1. The request comes from the same application attempt + * 2. The request possess a container token that has not expired + * 3. The request possess a container token that is granted by a known RM + */ ContainerId containerId = containerTokenIdentifier.getContainerID(); String containerIDStr = containerId.toString(); boolean unauthorized = false; StringBuilder messageBuilder = - new StringBuilder("Unauthorized request to start container. "); + new StringBuilder("Unauthorized request to " + (startRequest ? + "start container." : "increase container resource.")); if (!nmTokenIdentifier.getApplicationAttemptId().getApplicationId(). equals(containerId.getApplicationAttemptId().getApplicationId())) { unauthorized = true; messageBuilder.append("\nNMToken for application attempt : ") .append(nmTokenIdentifier.getApplicationAttemptId()) - .append(" was used for starting container with container token") + .append(" was used for " + + (startRequest ? "starting " : "increasing resource of ") + + "container with container token") .append(" issued for application attempt : ") .append(containerId.getApplicationAttemptId()); - } else if (!this.context.getContainerTokenSecretManager() + } else if (startRequest && !this.context.getContainerTokenSecretManager() .isValidStartContainerRequest(containerTokenIdentifier)) { // Is the container being relaunched? Or RPC layer let startCall with // tokens generated off old-secret through? @@ -729,6 +745,14 @@ protected void authorizeStartRequest(NMTokenIdentifier nmTokenIdentifier, LOG.error(msg); throw RPCUtil.getRemoteException(msg); } + if (containerTokenIdentifier.getRMIdentifier() != nodeStatusUpdater + .getRMIdentifier()) { + // Is the container coming from unknown RM + StringBuilder sb = new StringBuilder("\nContainer "); + sb.append(containerTokenIdentifier.getContainerID().toString()) + .append(" rejected as it is allocated by a previous RM"); + throw new InvalidContainerException(sb.toString()); + } } /** @@ -745,7 +769,7 @@ protected void authorizeStartRequest(NMTokenIdentifier nmTokenIdentifier, } UserGroupInformation remoteUgi = getRemoteUgi(); NMTokenIdentifier nmTokenIdentifier = selectNMTokenIdentifier(remoteUgi); - authorizeUser(remoteUgi,nmTokenIdentifier); + authorizeUser(remoteUgi, nmTokenIdentifier); List succeededContainers = new ArrayList(); Map failedContainers = new HashMap(); @@ -844,16 +868,8 @@ private void startContainerInternal(NMTokenIdentifier nmTokenIdentifier, * belongs to correct Node Manager (part of retrieve password). c) It has * correct RMIdentifier. d) It is not expired. */ - authorizeStartRequest(nmTokenIdentifier, containerTokenIdentifier); - - if (containerTokenIdentifier.getRMIdentifier() != nodeStatusUpdater - .getRMIdentifier()) { - // Is the container coming from unknown RM - StringBuilder sb = new StringBuilder("\nContainer "); - sb.append(containerTokenIdentifier.getContainerID().toString()) - .append(" rejected as it is allocated by a previous RM"); - throw new InvalidContainerException(sb.toString()); - } + authorizeStartAndResourceIncreaseRequest( + nmTokenIdentifier, containerTokenIdentifier, true); // update NMToken updateNMTokenIdentifier(nmTokenIdentifier); @@ -960,9 +976,118 @@ protected ContainerTokenIdentifier verifyAndGetContainerTokenIdentifier( @Override public IncreaseContainersResourceResponse increaseContainersResource( IncreaseContainersResourceRequest requests) - throws YarnException, IOException { - // To be implemented in YARN-1645 - return null; + throws YarnException, IOException { + if (blockNewContainerRequests.get()) { + throw new NMNotYetReadyException( + "Rejecting container resource increase as NodeManager has not" + + " yet connected with ResourceManager"); + } + UserGroupInformation remoteUgi = getRemoteUgi(); + NMTokenIdentifier nmTokenIdentifier = selectNMTokenIdentifier(remoteUgi); + authorizeUser(remoteUgi, nmTokenIdentifier); + List successfullyIncreasedContainers + = new ArrayList(); + Map failedContainers = + new HashMap(); + // Process container resource increase requests + for (org.apache.hadoop.yarn.api.records.Token token : + requests.getContainersToIncrease()) { + ContainerId containerId = null; + try { + if (token.getIdentifier() == null) { + throw new IOException(INVALID_CONTAINERTOKEN_MSG); + } + ContainerTokenIdentifier containerTokenIdentifier = + BuilderUtils.newContainerTokenIdentifier(token); + verifyAndGetContainerTokenIdentifier(token, + containerTokenIdentifier); + authorizeStartAndResourceIncreaseRequest( + nmTokenIdentifier, containerTokenIdentifier, false); + containerId = containerTokenIdentifier.getContainerID(); + // Reuse the startContainer logic to update NMToken, + // as container resource increase request will have come with + // an updated NMToken. + updateNMTokenIdentifier(nmTokenIdentifier); + Resource resource = containerTokenIdentifier.getResource(); + changeContainerResourceInternal(containerId, resource, true); + successfullyIncreasedContainers.add(containerId); + } catch (YarnException | InvalidToken e) { + failedContainers.put(containerId, SerializedException.newInstance(e)); + } catch (IOException e) { + throw RPCUtil.getRemoteException(e); + } + } + return IncreaseContainersResourceResponse.newInstance( + successfullyIncreasedContainers, failedContainers); + } + + @SuppressWarnings("unchecked") + private void changeContainerResourceInternal( + ContainerId containerId, Resource targetResource, boolean increase) + throws YarnException, IOException { + Container container = context.getContainers().get(containerId); + // Check container existence + if (container == null) { + if (nodeStatusUpdater.isContainerRecentlyStopped(containerId)) { + throw RPCUtil.getRemoteException("Container " + containerId.toString() + + " was recently stopped on node manager."); + } else { + throw RPCUtil.getRemoteException("Container " + containerId.toString() + + " is not handled by this NodeManager"); + } + } + // Check container state + org.apache.hadoop.yarn.server.nodemanager. + containermanager.container.ContainerState currentState = + container.getContainerState(); + if (currentState != org.apache.hadoop.yarn.server. + nodemanager.containermanager.container.ContainerState.RUNNING) { + throw RPCUtil.getRemoteException("Container " + containerId.toString() + + " is in " + currentState.name() + " state." + + " Resource can only be changed when a container is in" + + " RUNNING state"); + } + // Check validity of the target resource. + Resource currentResource = container.getResource(); + if (currentResource.equals(targetResource)) { + LOG.warn("Unable to change resource for container " + + containerId.toString() + + ". The target resource " + + targetResource.toString() + + " is the same as the current resource"); + return; + } + if (increase && !Resources.fitsIn(currentResource, targetResource)) { + throw RPCUtil.getRemoteException("Unable to increase resource for " + + "container " + containerId.toString() + + ". The target resource " + + targetResource.toString() + + " is smaller than the current resource " + + currentResource.toString()); + } + if (!increase && + (!Resources.fitsIn(Resources.none(), targetResource) + || !Resources.fitsIn(targetResource, currentResource))) { + throw RPCUtil.getRemoteException("Unable to decrease resource for " + + "container " + containerId.toString() + + ". The target resource " + + targetResource.toString() + + " is not smaller than the current resource " + + currentResource.toString()); + } + this.readLock.lock(); + try { + if (!serviceStopped) { + dispatcher.getEventHandler().handle(new ChangeContainerResourceEvent( + containerId, targetResource)); + } else { + throw new YarnException( + "Unable to change container resource as the NodeManager is " + + "in the process of shutting down"); + } + } finally { + this.readLock.unlock(); + } } @Private @@ -1182,6 +1307,21 @@ public void handle(ContainerManagerEvent event) { "Container Killed by ResourceManager")); } break; + case DECREASE_CONTAINERS_RESOURCE: + CMgrDecreaseContainersResourceEvent containersDecreasedEvent = + (CMgrDecreaseContainersResourceEvent) event; + for (org.apache.hadoop.yarn.api.records.Container container + : containersDecreasedEvent.getContainersToDecrease()) { + try { + changeContainerResourceInternal(container.getId(), + container.getResource(), false); + } catch (YarnException e) { + LOG.error("Unable to decrease container resource", e); + } catch (IOException e) { + LOG.error("Unable to update container resource in store", e); + } + } + break; default: throw new YarnRuntimeException( "Got an unknown ContainerManagerEvent type: " + event.getType()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ChangeContainerResourceEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ChangeContainerResourceEvent.java new file mode 100644 index 0000000000..3944a3dabe --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ChangeContainerResourceEvent.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.container; + +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Resource; + +public class ChangeContainerResourceEvent extends ContainerEvent { + + private Resource resource; + + public ChangeContainerResourceEvent(ContainerId c, Resource resource) { + super(c, ContainerEventType.CHANGE_CONTAINER_RESOURCE); + this.resource = resource; + } + + public Resource getResource() { + return this.resource; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java index 5622f8c6e1..dc712bfbed 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java @@ -25,6 +25,10 @@ public enum ContainerEventType { KILL_CONTAINER, UPDATE_DIAGNOSTICS_MSG, CONTAINER_DONE, + CHANGE_CONTAINER_RESOURCE, + + // Producer: ContainerMonitor + CONTAINER_RESOURCE_CHANGED, // DownloadManager CONTAINER_INITED, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java index 349340bb85..3ff04d8ef9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java @@ -191,8 +191,10 @@ public void setBlockNewContainerRequests(boolean blockNewContainerRequests) { } @Override - protected void authorizeStartRequest(NMTokenIdentifier nmTokenIdentifier, - ContainerTokenIdentifier containerTokenIdentifier) throws YarnException { + protected void authorizeStartAndResourceIncreaseRequest( + NMTokenIdentifier nmTokenIdentifier, + ContainerTokenIdentifier containerTokenIdentifier, + boolean startRequest) throws YarnException { // do nothing } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java index a47e7f78e1..9a05278305 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java @@ -189,6 +189,28 @@ public void testStartContainerFailureWithUnknownAuxService() throws Exception { super.testStartContainerFailureWithUnknownAuxService(); } + @Override + public void testIncreaseContainerResourceWithInvalidRequests() throws Exception { + // Don't run the test if the binary is not available. + if (!shouldRunTest()) { + LOG.info("LCE binary path is not passed. Not running the test"); + return; + } + LOG.info("Running testIncreaseContainerResourceWithInvalidRequests"); + super.testIncreaseContainerResourceWithInvalidRequests(); + } + + @Override + public void testIncreaseContainerResourceWithInvalidResource() throws Exception { + // Don't run the test if the binary is not available. + if (!shouldRunTest()) { + LOG.info("LCE binary path is not passed. Not running the test"); + return; + } + LOG.info("Running testIncreaseContainerResourceWithInvalidResource"); + super.testIncreaseContainerResourceWithInvalidResource(); + } + private boolean shouldRunTest() { return System .getProperty(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH) != null; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java index 2810662042..39383428b9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java @@ -209,12 +209,13 @@ protected void authorizeUser(UserGroupInformation remoteUgi, // do nothing } @Override - protected void authorizeStartRequest( - NMTokenIdentifier nmTokenIdentifier, - ContainerTokenIdentifier containerTokenIdentifier) throws YarnException { - // do nothing - } - + protected void authorizeStartAndResourceIncreaseRequest( + NMTokenIdentifier nmTokenIdentifier, + ContainerTokenIdentifier containerTokenIdentifier, + boolean startRequest) throws YarnException { + // do nothing + } + @Override protected void updateNMTokenIdentifier( NMTokenIdentifier nmTokenIdentifier) throws InvalidToken { @@ -310,4 +311,34 @@ static void waitForApplicationState(ContainerManagerImpl containerManager, app.getApplicationState().equals(finalState)); } + public static void waitForNMContainerState(ContainerManagerImpl + containerManager, ContainerId containerID, + org.apache.hadoop.yarn.server.nodemanager.containermanager + .container.ContainerState finalState) + throws InterruptedException, YarnException, IOException { + waitForNMContainerState(containerManager, containerID, finalState, 20); + } + + public static void waitForNMContainerState(ContainerManagerImpl + containerManager, ContainerId containerID, + org.apache.hadoop.yarn.server.nodemanager.containermanager + .container.ContainerState finalState, int timeOutMax) + throws InterruptedException, YarnException, IOException { + Container container = + containerManager.getContext().getContainers().get(containerID); + org.apache.hadoop.yarn.server.nodemanager + .containermanager.container.ContainerState currentState = + container.getContainerState(); + int timeoutSecs = 0; + while (!currentState.equals(finalState) + && timeoutSecs++ < timeOutMax) { + Thread.sleep(1000); + LOG.info("Waiting for NM container to get into state " + finalState + + ". Current state is " + currentState); + currentState = container.getContainerState(); + } + LOG.info("Container state is " + currentState); + Assert.assertEquals("ContainerState is not correct (timedout)", + finalState, currentState); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java index e508424e48..e2f12ba9d5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java @@ -38,6 +38,8 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.Service; import org.apache.hadoop.util.Shell; +import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; @@ -72,6 +74,7 @@ import org.apache.hadoop.yarn.security.NMTokenIdentifier; import org.apache.hadoop.yarn.server.api.ResourceManagerConstants; import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent; +import org.apache.hadoop.yarn.server.nodemanager.CMgrDecreaseContainersResourceEvent; import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestAuxServices.ServiceA; @@ -87,6 +90,8 @@ import org.junit.Test; import org.mockito.Mockito; +import static org.junit.Assert.assertEquals; + public class TestContainerManager extends BaseContainerManagerTest { public TestContainerManager() throws UnsupportedFileSystemException { @@ -803,7 +808,8 @@ public void testNullTokens() throws Exception { metrics, dirsHandler); String strExceptionMsg = ""; try { - cMgrImpl.authorizeStartRequest(null, new ContainerTokenIdentifier()); + cMgrImpl.authorizeStartAndResourceIncreaseRequest( + null, new ContainerTokenIdentifier(), true); } catch(YarnException ye) { strExceptionMsg = ye.getMessage(); } @@ -812,7 +818,8 @@ public void testNullTokens() throws Exception { strExceptionMsg = ""; try { - cMgrImpl.authorizeStartRequest(new NMTokenIdentifier(), null); + cMgrImpl.authorizeStartAndResourceIncreaseRequest( + new NMTokenIdentifier(), null, true); } catch(YarnException ye) { strExceptionMsg = ye.getMessage(); } @@ -878,6 +885,167 @@ public void testNullTokens() throws Exception { ContainerManagerImpl.INVALID_CONTAINERTOKEN_MSG); } + @Test + public void testIncreaseContainerResourceWithInvalidRequests() throws Exception { + containerManager.start(); + // Start 4 containers 0..4 with default resource (1024, 1) + List list = new ArrayList<>(); + ContainerLaunchContext containerLaunchContext = recordFactory + .newRecordInstance(ContainerLaunchContext.class); + for (int i = 0; i < 4; i++) { + ContainerId cId = createContainerId(i); + long identifier = DUMMY_RM_IDENTIFIER; + Token containerToken = createContainerToken(cId, identifier, + context.getNodeId(), user, context.getContainerTokenSecretManager()); + StartContainerRequest request = StartContainerRequest.newInstance( + containerLaunchContext, containerToken); + list.add(request); + } + StartContainersRequest requestList = StartContainersRequest + .newInstance(list); + StartContainersResponse response = containerManager + .startContainers(requestList); + + Assert.assertEquals(4, response.getSuccessfullyStartedContainers().size()); + int i = 0; + for (ContainerId id : response.getSuccessfullyStartedContainers()) { + Assert.assertEquals(i, id.getContainerId()); + i++; + } + + Thread.sleep(2000); + // Construct container resource increase request, + List increaseTokens = new ArrayList(); + // Add increase request for container-0, the request will fail as the + // container will have exited, and won't be in RUNNING state + ContainerId cId0 = createContainerId(0); + Token containerToken = + createContainerToken(cId0, DUMMY_RM_IDENTIFIER, + context.getNodeId(), user, + Resource.newInstance(1234, 3), + context.getContainerTokenSecretManager(), null); + increaseTokens.add(containerToken); + // Add increase request for container-7, the request will fail as the + // container does not exist + ContainerId cId7 = createContainerId(7); + containerToken = + createContainerToken(cId7, DUMMY_RM_IDENTIFIER, + context.getNodeId(), user, + Resource.newInstance(1234, 3), + context.getContainerTokenSecretManager(), null); + increaseTokens.add(containerToken); + + IncreaseContainersResourceRequest increaseRequest = + IncreaseContainersResourceRequest + .newInstance(increaseTokens); + IncreaseContainersResourceResponse increaseResponse = + containerManager.increaseContainersResource(increaseRequest); + // Check response + Assert.assertEquals( + 0, increaseResponse.getSuccessfullyIncreasedContainers().size()); + Assert.assertEquals(2, increaseResponse.getFailedRequests().size()); + for (Map.Entry entry : increaseResponse + .getFailedRequests().entrySet()) { + Assert.assertNotNull("Failed message", entry.getValue().getMessage()); + if (cId0.equals(entry.getKey())) { + Assert.assertTrue(entry.getValue().getMessage() + .contains("Resource can only be changed when a " + + "container is in RUNNING state")); + } else if (cId7.equals(entry.getKey())) { + Assert.assertTrue(entry.getValue().getMessage() + .contains("Container " + cId7.toString() + + " is not handled by this NodeManager")); + } else { + throw new YarnException("Received failed request from wrong" + + " container: " + entry.getKey().toString()); + } + } + } + + @Test + public void testIncreaseContainerResourceWithInvalidResource() throws Exception { + containerManager.start(); + File scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile"); + PrintWriter fileWriter = new PrintWriter(scriptFile); + // Construct the Container-id + ContainerId cId = createContainerId(0); + if (Shell.WINDOWS) { + fileWriter.println("@ping -n 100 127.0.0.1 >nul"); + } else { + fileWriter.write("\numask 0"); + fileWriter.write("\nexec sleep 100"); + } + fileWriter.close(); + ContainerLaunchContext containerLaunchContext = + recordFactory.newRecordInstance(ContainerLaunchContext.class); + URL resource_alpha = + ConverterUtils.getYarnUrlFromPath(localFS + .makeQualified(new Path(scriptFile.getAbsolutePath()))); + LocalResource rsrc_alpha = + recordFactory.newRecordInstance(LocalResource.class); + rsrc_alpha.setResource(resource_alpha); + rsrc_alpha.setSize(-1); + rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION); + rsrc_alpha.setType(LocalResourceType.FILE); + rsrc_alpha.setTimestamp(scriptFile.lastModified()); + String destinationFile = "dest_file"; + Map localResources = + new HashMap(); + localResources.put(destinationFile, rsrc_alpha); + containerLaunchContext.setLocalResources(localResources); + List commands = + Arrays.asList(Shell.getRunScriptCommand(scriptFile)); + containerLaunchContext.setCommands(commands); + + StartContainerRequest scRequest = + StartContainerRequest.newInstance( + containerLaunchContext, + createContainerToken(cId, DUMMY_RM_IDENTIFIER, context.getNodeId(), + user, context.getContainerTokenSecretManager())); + List list = new ArrayList(); + list.add(scRequest); + StartContainersRequest allRequests = + StartContainersRequest.newInstance(list); + containerManager.startContainers(allRequests); + // Make sure the container reaches RUNNING state + BaseContainerManagerTest.waitForNMContainerState(containerManager, cId, + org.apache.hadoop.yarn.server.nodemanager. + containermanager.container.ContainerState.RUNNING); + // Construct container resource increase request, + List increaseTokens = new ArrayList(); + // Add increase request. The increase request should fail + // as the current resource does not fit in the target resource + Token containerToken = + createContainerToken(cId, DUMMY_RM_IDENTIFIER, + context.getNodeId(), user, + Resource.newInstance(512, 1), + context.getContainerTokenSecretManager(), null); + increaseTokens.add(containerToken); + IncreaseContainersResourceRequest increaseRequest = + IncreaseContainersResourceRequest + .newInstance(increaseTokens); + IncreaseContainersResourceResponse increaseResponse = + containerManager.increaseContainersResource(increaseRequest); + // Check response + Assert.assertEquals( + 0, increaseResponse.getSuccessfullyIncreasedContainers().size()); + Assert.assertEquals(1, increaseResponse.getFailedRequests().size()); + for (Map.Entry entry : increaseResponse + .getFailedRequests().entrySet()) { + if (cId.equals(entry.getKey())) { + Assert.assertNotNull("Failed message", entry.getValue().getMessage()); + Assert.assertTrue(entry.getValue().getMessage() + .contains("The target resource " + + Resource.newInstance(512, 1).toString() + + " is smaller than the current resource " + + Resource.newInstance(1024, 1))); + } else { + throw new YarnException("Received failed request from wrong" + + " container: " + entry.getKey().toString()); + } + } + } + public static Token createContainerToken(ContainerId cId, long rmIdentifier, NodeId nodeId, String user, NMContainerTokenSecretManager containerTokenSecretManager) @@ -892,15 +1060,21 @@ public static Token createContainerToken(ContainerId cId, long rmIdentifier, LogAggregationContext logAggregationContext) throws IOException { Resource r = BuilderUtils.newResource(1024, 1); + return createContainerToken(cId, rmIdentifier, nodeId, user, r, + containerTokenSecretManager, logAggregationContext); + } + + public static Token createContainerToken(ContainerId cId, long rmIdentifier, + NodeId nodeId, String user, Resource resource, + NMContainerTokenSecretManager containerTokenSecretManager, + LogAggregationContext logAggregationContext) + throws IOException { ContainerTokenIdentifier containerTokenIdentifier = - new ContainerTokenIdentifier(cId, nodeId.toString(), user, r, + new ContainerTokenIdentifier(cId, nodeId.toString(), user, resource, System.currentTimeMillis() + 100000L, 123, rmIdentifier, Priority.newInstance(0), 0, logAggregationContext, null); - Token containerToken = - BuilderUtils - .newContainerToken(nodeId, containerTokenSecretManager - .retrievePassword(containerTokenIdentifier), + return BuilderUtils.newContainerToken(nodeId, containerTokenSecretManager + .retrievePassword(containerTokenIdentifier), containerTokenIdentifier); - return containerToken; } }