diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java index ac9fbb7070..ef5d72c6b7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java @@ -96,4 +96,11 @@ public interface Container extends EventHandler { void sendKillEvent(int exitStatus, String description); boolean isRecovering(); + + /** + * Get assigned resource mappings to the container. + * + * @return Resource Mappings of the container + */ + ResourceMappings getResourceMappings(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index 772b6e7660..a768d18bb6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -185,6 +185,7 @@ private ReInitializationContext createContextForRollback() { private boolean recoveredAsKilled = false; private Context context; private ResourceSet resourceSet; + private ResourceMappings resourceMappings; public ContainerImpl(Configuration conf, Dispatcher dispatcher, ContainerLaunchContext launchContext, Credentials creds, @@ -242,6 +243,7 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, stateMachine = stateMachineFactory.make(this); this.context = context; this.resourceSet = new ResourceSet(); + this.resourceMappings = new ResourceMappings(); } private static ContainerRetryContext configureRetryContext( @@ -282,6 +284,7 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, this.remainingRetryAttempts = rcs.getRemainingRetryAttempts(); this.workDir = rcs.getWorkDir(); this.logDir = rcs.getLogDir(); + this.resourceMappings = rcs.getResourceMappings(); } private static final ContainerDiagnosticsUpdateTransition UPDATE_DIAGNOSTICS_TRANSITION = @@ -1789,4 +1792,14 @@ public boolean isRecovering() { getContainerState() == ContainerState.NEW); return isRecovering; } + + /** + * Get assigned resource mappings to the container. + * + * @return Resource Mappings of the container + */ + @Override + public ResourceMappings getResourceMappings() { + return resourceMappings; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ResourceMappings.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ResourceMappings.java new file mode 100644 index 0000000000..d673341b01 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ResourceMappings.java @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.container; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.io.IOUtils; + +/** + * This class is used to store assigned resource to a single container by + * resource types. + * + * Assigned resource could be list of String + * + * For example, we can assign container to: + * "numa": ["numa0"] + * "gpu": ["0", "1", "2", "3"] + * "fpga": ["1", "3"] + * + * This will be used for NM restart container recovery. + */ +public class ResourceMappings { + + private Map assignedResourcesMap = new HashMap<>(); + + /** + * Get all resource mappings. + * @param resourceType resourceType + * @return map of resource mapping + */ + public List getAssignedResources(String resourceType) { + AssignedResources ar = assignedResourcesMap.get(resourceType); + if (null == ar) { + return Collections.emptyList(); + } + return ar.getAssignedResources(); + } + + /** + * Adds the resources for a given resource type. + * + * @param resourceType Resource Type + * @param assigned Assigned resources to add + */ + public void addAssignedResources(String resourceType, + AssignedResources assigned) { + assignedResourcesMap.put(resourceType, assigned); + } + + /** + * Stores resources assigned to a container for a given resource type. + */ + public static class AssignedResources implements Serializable { + private static final long serialVersionUID = -1059491941955757926L; + private List resources = Collections.emptyList(); + + public List getAssignedResources() { + return Collections.unmodifiableList(resources); + } + + public void updateAssignedResources(List list) { + this.resources = new ArrayList<>(list); + } + + @SuppressWarnings("unchecked") + public static AssignedResources fromBytes(byte[] bytes) + throws IOException { + ObjectInputStream ois = null; + List resources; + try { + ByteArrayInputStream bis = new ByteArrayInputStream(bytes); + ois = new ObjectInputStream(bis); + resources = (List) ois.readObject(); + } catch (ClassNotFoundException e) { + throw new IOException(e); + } finally { + IOUtils.closeQuietly(ois); + } + AssignedResources ar = new AssignedResources(); + ar.updateAssignedResources(resources); + return ar; + } + + public byte[] toBytes() throws IOException { + ObjectOutputStream oos = null; + byte[] bytes; + try { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + oos = new ObjectOutputStream(bos); + oos.writeObject(resources); + bytes = bos.toByteArray(); + } finally { + IOUtils.closeQuietly(oos); + } + return bytes; + } + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java index a31756e1c9..db931f8e62 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java @@ -24,6 +24,7 @@ import java.io.File; import java.io.IOException; +import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -39,6 +40,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainerRequestPBImpl; @@ -60,6 +62,7 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainerRequestProto; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings; import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl; import org.apache.hadoop.yarn.server.utils.LeveldbIterator; @@ -144,6 +147,9 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { private static final String AMRMPROXY_KEY_PREFIX = "AMRMProxy/"; + private static final String CONTAINER_ASSIGNED_RESOURCES_KEY_SUFFIX = + "/assignedResources_"; + private static final byte[] EMPTY_VALUE = new byte[0]; private DB db; @@ -286,6 +292,13 @@ private RecoveredContainerState loadContainerState(ContainerId containerId, rcs.setWorkDir(asString(entry.getValue())); } else if (suffix.equals(CONTAINER_LOG_DIR_KEY_SUFFIX)) { rcs.setLogDir(asString(entry.getValue())); + } else if (suffix.startsWith(CONTAINER_ASSIGNED_RESOURCES_KEY_SUFFIX)) { + String resourceType = suffix.substring( + CONTAINER_ASSIGNED_RESOURCES_KEY_SUFFIX.length()); + ResourceMappings.AssignedResources assignedResources = + ResourceMappings.AssignedResources.fromBytes(entry.getValue()); + rcs.getResourceMappings().addAssignedResources(resourceType, + assignedResources); } else { LOG.warn("the container " + containerId + " will be killed because of the unknown key " + key @@ -1091,6 +1104,35 @@ public void removeLogDeleter(ApplicationId appId) throws IOException { } } + @Override + public void storeAssignedResources(ContainerId containerId, + String resourceType, List assignedResources) + throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("storeAssignedResources: containerId=" + containerId + + ", assignedResources=" + StringUtils.join(",", assignedResources)); + } + + String keyResChng = CONTAINERS_KEY_PREFIX + containerId.toString() + + CONTAINER_ASSIGNED_RESOURCES_KEY_SUFFIX + resourceType; + try { + WriteBatch batch = db.createWriteBatch(); + try { + ResourceMappings.AssignedResources res = + new ResourceMappings.AssignedResources(); + res.updateAssignedResources(assignedResources); + + // New value will overwrite old values for the same key + batch.put(bytes(keyResChng), res.toBytes()); + db.write(batch); + } finally { + batch.close(); + } + } catch (DBException e) { + throw new IOException(e); + } + } + @SuppressWarnings("deprecation") private void cleanupDeprecatedFinishedApps() { try { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java index 86dc99fdea..dc1cece710 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.nodemanager.recovery; import java.io.IOException; +import java.io.Serializable; import java.util.List; import org.apache.hadoop.conf.Configuration; @@ -257,6 +258,12 @@ public void removeAMRMProxyAppContext(ApplicationAttemptId attempt) throws IOException { } + @Override + public void storeAssignedResources(ContainerId containerId, + String resourceType, List assignedResources) + throws IOException { + } + @Override protected void initStorage(Configuration conf) throws IOException { } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java index ec534bff7e..62a2b9fd71 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.nodemanager.recovery; import java.io.IOException; +import java.io.Serializable; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -42,6 +43,7 @@ import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto; import org.apache.hadoop.yarn.server.api.records.MasterKey; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings; @Private @Unstable @@ -88,6 +90,7 @@ public static class RecoveredContainerState { private RecoveredContainerType recoveryType = RecoveredContainerType.RECOVER; private long startTime; + private ResourceMappings resMappings = new ResourceMappings(); public RecoveredContainerStatus getStatus() { return status; @@ -172,6 +175,14 @@ public RecoveredContainerType getRecoveryType() { public void setRecoveryType(RecoveredContainerType recoveryType) { this.recoveryType = recoveryType; } + + public ResourceMappings getResourceMappings() { + return resMappings; + } + + public void setResourceMappings(ResourceMappings mappings) { + this.resMappings = mappings; + } } public static class LocalResourceTrackerState { @@ -699,6 +710,18 @@ public abstract void removeAMRMProxyAppContextEntry( public abstract void removeAMRMProxyAppContext(ApplicationAttemptId attempt) throws IOException; + /** + * Store the assigned resources to a container. + * + * @param containerId Container Id + * @param resourceType Resource Type + * @param assignedResources Assigned resources + * @throws IOException if fails + */ + public abstract void storeAssignedResources(ContainerId containerId, + String resourceType, List assignedResources) + throws IOException; + protected abstract void initStorage(Configuration conf) throws IOException; protected abstract void startStorage() throws IOException; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java index 224e99cf9f..5ec0ae64c3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java @@ -31,6 +31,7 @@ import java.io.File; import java.io.IOException; import java.io.PrintWriter; +import java.io.Serializable; import java.nio.ByteBuffer; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; @@ -90,6 +91,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService; @@ -108,6 +110,7 @@ import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -400,9 +403,8 @@ public void testContainerResizeRecovery() throws Exception { NMStateStoreService stateStore = new NMMemoryStateStoreService(); stateStore.init(conf); stateStore.start(); - Context context = createContext(conf, stateStore); + context = createContext(conf, stateStore); ContainerManagerImpl cm = createContainerManager(context, delSrvc); - cm.dispatcher.disableExitOnDispatchException(); cm.init(conf); cm.start(); // add an application by starting a container @@ -410,55 +412,12 @@ public void testContainerResizeRecovery() throws Exception { ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1); ContainerId cid = ContainerId.newContainerId(attemptId, 1); - Map containerEnv = new HashMap<>(); - setFlowContext(containerEnv, "app_name1", appId); - Map serviceData = Collections.emptyMap(); - Credentials containerCreds = new Credentials(); - DataOutputBuffer dob = new DataOutputBuffer(); - containerCreds.writeTokenStorageToStream(dob); - ByteBuffer containerTokens = ByteBuffer.wrap(dob.getData(), 0, - dob.getLength()); - Map acls = Collections.emptyMap(); - File tmpDir = new File("target", - this.getClass().getSimpleName() + "-tmpDir"); - File scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile"); - PrintWriter fileWriter = new PrintWriter(scriptFile); - if (Shell.WINDOWS) { - fileWriter.println("@ping -n 100 127.0.0.1 >nul"); - } else { - fileWriter.write("\numask 0"); - fileWriter.write("\nexec sleep 100"); - } - fileWriter.close(); - FileContext localFS = FileContext.getLocalFSFileContext(); - URL resource_alpha = - URL.fromPath(localFS - .makeQualified(new Path(scriptFile.getAbsolutePath()))); - LocalResource rsrc_alpha = RecordFactoryProvider - .getRecordFactory(null).newRecordInstance(LocalResource.class); - rsrc_alpha.setResource(resource_alpha); - rsrc_alpha.setSize(-1); - rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION); - rsrc_alpha.setType(LocalResourceType.FILE); - rsrc_alpha.setTimestamp(scriptFile.lastModified()); - String destinationFile = "dest_file"; - Map localResources = new HashMap<>(); - localResources.put(destinationFile, rsrc_alpha); - List commands = - Arrays.asList(Shell.getRunScriptCommand(scriptFile)); - ContainerLaunchContext clc = ContainerLaunchContext.newInstance( - localResources, containerEnv, commands, serviceData, - containerTokens, acls); - StartContainersResponse startResponse = startContainer( - context, cm, cid, clc, null); - assertTrue(startResponse.getFailedRequests().isEmpty()); - assertEquals(1, context.getApplications().size()); + + commonLaunchContainer(appId, cid, cm); + Application app = context.getApplications().get(appId); assertNotNull(app); - // make sure the container reaches RUNNING state - waitForNMContainerState(cm, cid, - org.apache.hadoop.yarn.server.nodemanager - .containermanager.container.ContainerState.RUNNING); + Resource targetResource = Resource.newInstance(2048, 2); ContainerUpdateResponse updateResponse = updateContainers(context, cm, cid, targetResource); @@ -480,6 +439,58 @@ public void testContainerResizeRecovery() throws Exception { assertEquals(targetResource, containerStatus.getCapability()); } + @Test + public void testResourceMappingRecoveryForContainer() throws Exception { + conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true); + conf.setBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED, true); + NMStateStoreService stateStore = new NMMemoryStateStoreService(); + stateStore.init(conf); + stateStore.start(); + context = createContext(conf, stateStore); + ContainerManagerImpl cm = createContainerManager(context, delSrvc); + cm.init(conf); + cm.start(); + + // add an application by starting a container + ApplicationId appId = ApplicationId.newInstance(0, 1); + ApplicationAttemptId attemptId = + ApplicationAttemptId.newInstance(appId, 1); + ContainerId cid = ContainerId.newContainerId(attemptId, 1); + + commonLaunchContainer(appId, cid, cm); + + Application app = context.getApplications().get(appId); + assertNotNull(app); + + // store resource mapping of the container + List gpuResources = Arrays.asList("1", "2", "3"); + stateStore.storeAssignedResources(cid, "gpu", gpuResources); + List numaResources = Arrays.asList("numa1"); + stateStore.storeAssignedResources(cid, "numa", numaResources); + List fpgaResources = Arrays.asList("fpga1", "fpga2"); + stateStore.storeAssignedResources(cid, "fpga", fpgaResources); + + cm.stop(); + context = createContext(conf, stateStore); + cm = createContainerManager(context); + cm.init(conf); + cm.start(); + assertEquals(1, context.getApplications().size()); + app = context.getApplications().get(appId); + assertNotNull(app); + + Container nmContainer = context.getContainers().get(cid); + Assert.assertNotNull(nmContainer); + ResourceMappings resourceMappings = nmContainer.getResourceMappings(); + List assignedResource = resourceMappings + .getAssignedResources("gpu"); + Assert.assertTrue(assignedResource.equals(gpuResources)); + Assert.assertTrue( + resourceMappings.getAssignedResources("numa").equals(numaResources)); + Assert.assertTrue( + resourceMappings.getAssignedResources("fpga").equals(fpgaResources)); + } + @Test public void testContainerCleanupOnShutdown() throws Exception { ApplicationId appId = ApplicationId.newInstance(0, 1); @@ -552,6 +563,57 @@ public void testContainerCleanupOnShutdown() throws Exception { verify(cm, never()).handle(isA(CMgrCompletedAppsEvent.class)); } + private void commonLaunchContainer(ApplicationId appId, ContainerId cid, + ContainerManagerImpl cm) throws Exception { + Map containerEnv = new HashMap<>(); + setFlowContext(containerEnv, "app_name1", appId); + Map serviceData = Collections.emptyMap(); + Credentials containerCreds = new Credentials(); + DataOutputBuffer dob = new DataOutputBuffer(); + containerCreds.writeTokenStorageToStream(dob); + ByteBuffer containerTokens = ByteBuffer.wrap(dob.getData(), 0, + dob.getLength()); + Map acls = Collections.emptyMap(); + File tmpDir = new File("target", + this.getClass().getSimpleName() + "-tmpDir"); + File scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile"); + PrintWriter fileWriter = new PrintWriter(scriptFile); + if (Shell.WINDOWS) { + fileWriter.println("@ping -n 100 127.0.0.1 >nul"); + } else { + fileWriter.write("\numask 0"); + fileWriter.write("\nexec sleep 100"); + } + fileWriter.close(); + FileContext localFS = FileContext.getLocalFSFileContext(); + URL resource_alpha = + URL.fromPath(localFS + .makeQualified(new Path(scriptFile.getAbsolutePath()))); + LocalResource rsrc_alpha = RecordFactoryProvider + .getRecordFactory(null).newRecordInstance(LocalResource.class); + rsrc_alpha.setResource(resource_alpha); + rsrc_alpha.setSize(-1); + rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION); + rsrc_alpha.setType(LocalResourceType.FILE); + rsrc_alpha.setTimestamp(scriptFile.lastModified()); + String destinationFile = "dest_file"; + Map localResources = new HashMap<>(); + localResources.put(destinationFile, rsrc_alpha); + List commands = + Arrays.asList(Shell.getRunScriptCommand(scriptFile)); + ContainerLaunchContext clc = ContainerLaunchContext.newInstance( + localResources, containerEnv, commands, serviceData, + containerTokens, acls); + StartContainersResponse startResponse = startContainer( + context, cm, cid, clc, null); + assertTrue(startResponse.getFailedRequests().isEmpty()); + assertEquals(1, context.getApplications().size()); + // make sure the container reaches RUNNING state + waitForNMContainerState(cm, cid, + org.apache.hadoop.yarn.server.nodemanager + .containermanager.container.ContainerState.RUNNING); + } + private ContainerManagerImpl createContainerManager(Context context, DeletionService delSrvc) { return new ContainerManagerImpl(context, exec, delSrvc, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java index c1638df7b5..6d6875d6b9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.nodemanager.recovery; import java.io.IOException; +import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -40,6 +41,7 @@ import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings; public class NMMemoryStateStoreService extends NMStateStoreService { private Map apps; @@ -119,6 +121,7 @@ public synchronized List loadContainersState() rcsCopy.setRemainingRetryAttempts(rcs.getRemainingRetryAttempts()); rcsCopy.setWorkDir(rcs.getWorkDir()); rcsCopy.setLogDir(rcs.getLogDir()); + rcsCopy.setResourceMappings(rcs.getResourceMappings()); result.add(rcsCopy); } return result; @@ -480,6 +483,17 @@ public synchronized void removeAMRMProxyAppContext( amrmProxyState.getAppContexts().remove(attempt); } + @Override + public void storeAssignedResources(ContainerId containerId, + String resourceType, List assignedResources) + throws IOException { + ResourceMappings.AssignedResources ar = + new ResourceMappings.AssignedResources(); + ar.updateAssignedResources(assignedResources); + containerStates.get(containerId).getResourceMappings() + .addAssignedResources(resourceType, ar); + } + private static class TrackerState { Map inProgressMap = new HashMap(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java index b0a9bc92b1..b76f1fface 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java @@ -32,6 +32,7 @@ import java.io.File; import java.io.IOException; +import java.io.Serializable; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; @@ -961,46 +962,12 @@ public void testUnexpectedKeyDoesntThrowException() throws IOException { .loadContainersState(); assertTrue(recoveredContainers.isEmpty()); - // create a container request ApplicationId appId = ApplicationId.newInstance(1234, 3); ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 4); ContainerId containerId = ContainerId.newContainerId(appAttemptId, 5); - LocalResource lrsrc = LocalResource.newInstance( - URL.newInstance("hdfs", "somehost", 12345, "/some/path/to/rsrc"), - LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, 123L, - 1234567890L); - Map localResources = - new HashMap(); - localResources.put("rsrc", lrsrc); - Map env = new HashMap(); - env.put("somevar", "someval"); - List containerCmds = new ArrayList(); - containerCmds.add("somecmd"); - containerCmds.add("somearg"); - Map serviceData = new HashMap(); - serviceData.put("someservice", - ByteBuffer.wrap(new byte[] { 0x1, 0x2, 0x3 })); - ByteBuffer containerTokens = ByteBuffer - .wrap(new byte[] { 0x7, 0x8, 0x9, 0xa }); - Map acls = - new HashMap(); - acls.put(ApplicationAccessType.VIEW_APP, "viewuser"); - acls.put(ApplicationAccessType.MODIFY_APP, "moduser"); - ContainerLaunchContext clc = ContainerLaunchContext.newInstance( - localResources, env, containerCmds, - serviceData, containerTokens, acls); - Resource containerRsrc = Resource.newInstance(1357, 3); - ContainerTokenIdentifier containerTokenId = new ContainerTokenIdentifier( - containerId, "host", "user", containerRsrc, 9876543210L, 42, 2468, - Priority.newInstance(7), 13579); - Token containerToken = Token.newInstance(containerTokenId.getBytes(), - ContainerTokenIdentifier.KIND.toString(), "password".getBytes(), - "tokenservice"); - StartContainerRequest containerReq = StartContainerRequest.newInstance(clc, - containerToken); - - stateStore.storeContainer(containerId, 0, 0, containerReq); + StartContainerRequest startContainerRequest = storeMockContainer( + containerId); // add a invalid key byte[] invalidKey = ("ContainerManager/containers/" @@ -1013,7 +980,7 @@ public void testUnexpectedKeyDoesntThrowException() throws IOException { assertEquals(RecoveredContainerStatus.REQUESTED, rcs.getStatus()); assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode()); assertEquals(false, rcs.getKilled()); - assertEquals(containerReq, rcs.getStartRequest()); + assertEquals(startContainerRequest, rcs.getStartRequest()); assertTrue(rcs.getDiagnostics().isEmpty()); assertEquals(RecoveredContainerType.KILL, rcs.getRecoveryType()); // assert unknown keys are cleaned up finally @@ -1121,6 +1088,86 @@ public void testAMRMProxyStorage() throws IOException { } } + @Test + public void testStateStoreForResourceMapping() throws IOException { + // test empty when no state + List recoveredContainers = stateStore + .loadContainersState(); + assertTrue(recoveredContainers.isEmpty()); + + ApplicationId appId = ApplicationId.newInstance(1234, 3); + ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, + 4); + ContainerId containerId = ContainerId.newContainerId(appAttemptId, 5); + storeMockContainer(containerId); + + // Store ResourceMapping + stateStore.storeAssignedResources(containerId, "gpu", + Arrays.asList("1", "2", "3")); + // This will overwrite above + List gpuRes1 = Arrays.asList("1", "2", "4"); + stateStore.storeAssignedResources(containerId, "gpu", gpuRes1); + List fpgaRes = Arrays.asList("3", "4", "5", "6"); + stateStore.storeAssignedResources(containerId, "fpga", fpgaRes); + List numaRes = Arrays.asList("numa1"); + stateStore.storeAssignedResources(containerId, "numa", numaRes); + + // add a invalid key + restartStateStore(); + recoveredContainers = stateStore.loadContainersState(); + assertEquals(1, recoveredContainers.size()); + RecoveredContainerState rcs = recoveredContainers.get(0); + List res = rcs.getResourceMappings() + .getAssignedResources("gpu"); + Assert.assertTrue(res.equals(gpuRes1)); + + res = rcs.getResourceMappings().getAssignedResources("fpga"); + Assert.assertTrue(res.equals(fpgaRes)); + + res = rcs.getResourceMappings().getAssignedResources("numa"); + Assert.assertTrue(res.equals(numaRes)); + } + + private StartContainerRequest storeMockContainer(ContainerId containerId) + throws IOException { + // create a container request + LocalResource lrsrc = LocalResource.newInstance( + URL.newInstance("hdfs", "somehost", 12345, "/some/path/to/rsrc"), + LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, 123L, + 1234567890L); + Map localResources = + new HashMap(); + localResources.put("rsrc", lrsrc); + Map env = new HashMap(); + env.put("somevar", "someval"); + List containerCmds = new ArrayList(); + containerCmds.add("somecmd"); + containerCmds.add("somearg"); + Map serviceData = new HashMap(); + serviceData.put("someservice", + ByteBuffer.wrap(new byte[] { 0x1, 0x2, 0x3 })); + ByteBuffer containerTokens = ByteBuffer + .wrap(new byte[] { 0x7, 0x8, 0x9, 0xa }); + Map acls = + new HashMap(); + acls.put(ApplicationAccessType.VIEW_APP, "viewuser"); + acls.put(ApplicationAccessType.MODIFY_APP, "moduser"); + ContainerLaunchContext clc = ContainerLaunchContext.newInstance( + localResources, env, containerCmds, + serviceData, containerTokens, acls); + Resource containerRsrc = Resource.newInstance(1357, 3); + ContainerTokenIdentifier containerTokenId = new ContainerTokenIdentifier( + containerId, "host", "user", containerRsrc, 9876543210L, 42, 2468, + Priority.newInstance(7), 13579); + Token containerToken = Token.newInstance(containerTokenId.getBytes(), + ContainerTokenIdentifier.KIND.toString(), "password".getBytes(), + "tokenservice"); + StartContainerRequest containerReq = StartContainerRequest.newInstance(clc, + containerToken); + stateStore.storeContainer(containerId, 0, 0, containerReq); + return containerReq; + } + private static class NMTokenSecretManagerForTest extends BaseNMTokenSecretManager { public MasterKey generateKey() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java index 57bee8c665..d435ba085f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java @@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceSet; import org.apache.hadoop.yarn.server.utils.BuilderUtils; @@ -239,4 +240,9 @@ public boolean isRecovering() { public long getContainerStartTime() { return 0; } + + @Override + public ResourceMappings getResourceMappings() { + return null; + } }