YARN-7033. Add support for NM Recovery of assigned resources (e.g. GPU's, NUMA, FPGA's) to container. (Devaraj K and Wangda Tan)

Change-Id: Iffd18bb95debe1c8cc55e30abc1d8f663e9d0e30
This commit is contained in:
Wangda Tan 2017-09-07 14:13:37 -07:00
parent a4cd101934
commit f155ab7cfa
10 changed files with 431 additions and 86 deletions

View File

@ -96,4 +96,11 @@ public interface Container extends EventHandler<ContainerEvent> {
void sendKillEvent(int exitStatus, String description);
boolean isRecovering();
/**
* Get assigned resource mappings to the container.
*
* @return Resource Mappings of the container
*/
ResourceMappings getResourceMappings();
}

View File

@ -185,6 +185,7 @@ private ReInitializationContext createContextForRollback() {
private boolean recoveredAsKilled = false;
private Context context;
private ResourceSet resourceSet;
private ResourceMappings resourceMappings;
public ContainerImpl(Configuration conf, Dispatcher dispatcher,
ContainerLaunchContext launchContext, Credentials creds,
@ -242,6 +243,7 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher,
stateMachine = stateMachineFactory.make(this);
this.context = context;
this.resourceSet = new ResourceSet();
this.resourceMappings = new ResourceMappings();
}
private static ContainerRetryContext configureRetryContext(
@ -282,6 +284,7 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher,
this.remainingRetryAttempts = rcs.getRemainingRetryAttempts();
this.workDir = rcs.getWorkDir();
this.logDir = rcs.getLogDir();
this.resourceMappings = rcs.getResourceMappings();
}
private static final ContainerDiagnosticsUpdateTransition UPDATE_DIAGNOSTICS_TRANSITION =
@ -1789,4 +1792,14 @@ public boolean isRecovering() {
getContainerState() == ContainerState.NEW);
return isRecovering;
}
/**
* Get assigned resource mappings to the container.
*
* @return Resource Mappings of the container
*/
@Override
public ResourceMappings getResourceMappings() {
return resourceMappings;
}
}

View File

@ -0,0 +1,124 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.io.IOUtils;
/**
* This class is used to store assigned resource to a single container by
* resource types.
*
* Assigned resource could be list of String
*
* For example, we can assign container to:
* "numa": ["numa0"]
* "gpu": ["0", "1", "2", "3"]
* "fpga": ["1", "3"]
*
* This will be used for NM restart container recovery.
*/
public class ResourceMappings {
private Map<String, AssignedResources> assignedResourcesMap = new HashMap<>();
/**
* Get all resource mappings.
* @param resourceType resourceType
* @return map of resource mapping
*/
public List<Serializable> getAssignedResources(String resourceType) {
AssignedResources ar = assignedResourcesMap.get(resourceType);
if (null == ar) {
return Collections.emptyList();
}
return ar.getAssignedResources();
}
/**
* Adds the resources for a given resource type.
*
* @param resourceType Resource Type
* @param assigned Assigned resources to add
*/
public void addAssignedResources(String resourceType,
AssignedResources assigned) {
assignedResourcesMap.put(resourceType, assigned);
}
/**
* Stores resources assigned to a container for a given resource type.
*/
public static class AssignedResources implements Serializable {
private static final long serialVersionUID = -1059491941955757926L;
private List<Serializable> resources = Collections.emptyList();
public List<Serializable> getAssignedResources() {
return Collections.unmodifiableList(resources);
}
public void updateAssignedResources(List<Serializable> list) {
this.resources = new ArrayList<>(list);
}
@SuppressWarnings("unchecked")
public static AssignedResources fromBytes(byte[] bytes)
throws IOException {
ObjectInputStream ois = null;
List<Serializable> resources;
try {
ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
ois = new ObjectInputStream(bis);
resources = (List<Serializable>) ois.readObject();
} catch (ClassNotFoundException e) {
throw new IOException(e);
} finally {
IOUtils.closeQuietly(ois);
}
AssignedResources ar = new AssignedResources();
ar.updateAssignedResources(resources);
return ar;
}
public byte[] toBytes() throws IOException {
ObjectOutputStream oos = null;
byte[] bytes;
try {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
oos = new ObjectOutputStream(bos);
oos.writeObject(resources);
bytes = bos.toByteArray();
} finally {
IOUtils.closeQuietly(oos);
}
return bytes;
}
}
}

View File

@ -24,6 +24,7 @@
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@ -39,6 +40,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainerRequestPBImpl;
@ -60,6 +62,7 @@
import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainerRequestProto;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
@ -144,6 +147,9 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
private static final String AMRMPROXY_KEY_PREFIX = "AMRMProxy/";
private static final String CONTAINER_ASSIGNED_RESOURCES_KEY_SUFFIX =
"/assignedResources_";
private static final byte[] EMPTY_VALUE = new byte[0];
private DB db;
@ -286,6 +292,13 @@ private RecoveredContainerState loadContainerState(ContainerId containerId,
rcs.setWorkDir(asString(entry.getValue()));
} else if (suffix.equals(CONTAINER_LOG_DIR_KEY_SUFFIX)) {
rcs.setLogDir(asString(entry.getValue()));
} else if (suffix.startsWith(CONTAINER_ASSIGNED_RESOURCES_KEY_SUFFIX)) {
String resourceType = suffix.substring(
CONTAINER_ASSIGNED_RESOURCES_KEY_SUFFIX.length());
ResourceMappings.AssignedResources assignedResources =
ResourceMappings.AssignedResources.fromBytes(entry.getValue());
rcs.getResourceMappings().addAssignedResources(resourceType,
assignedResources);
} else {
LOG.warn("the container " + containerId
+ " will be killed because of the unknown key " + key
@ -1091,6 +1104,35 @@ public void removeLogDeleter(ApplicationId appId) throws IOException {
}
}
@Override
public void storeAssignedResources(ContainerId containerId,
String resourceType, List<Serializable> assignedResources)
throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("storeAssignedResources: containerId=" + containerId
+ ", assignedResources=" + StringUtils.join(",", assignedResources));
}
String keyResChng = CONTAINERS_KEY_PREFIX + containerId.toString()
+ CONTAINER_ASSIGNED_RESOURCES_KEY_SUFFIX + resourceType;
try {
WriteBatch batch = db.createWriteBatch();
try {
ResourceMappings.AssignedResources res =
new ResourceMappings.AssignedResources();
res.updateAssignedResources(assignedResources);
// New value will overwrite old values for the same key
batch.put(bytes(keyResChng), res.toBytes());
db.write(batch);
} finally {
batch.close();
}
} catch (DBException e) {
throw new IOException(e);
}
}
@SuppressWarnings("deprecation")
private void cleanupDeprecatedFinishedApps() {
try {

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.nodemanager.recovery;
import java.io.IOException;
import java.io.Serializable;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
@ -257,6 +258,12 @@ public void removeAMRMProxyAppContext(ApplicationAttemptId attempt)
throws IOException {
}
@Override
public void storeAssignedResources(ContainerId containerId,
String resourceType, List<Serializable> assignedResources)
throws IOException {
}
@Override
protected void initStorage(Configuration conf) throws IOException {
}

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.nodemanager.recovery;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@ -42,6 +43,7 @@
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings;
@Private
@Unstable
@ -88,6 +90,7 @@ public static class RecoveredContainerState {
private RecoveredContainerType recoveryType =
RecoveredContainerType.RECOVER;
private long startTime;
private ResourceMappings resMappings = new ResourceMappings();
public RecoveredContainerStatus getStatus() {
return status;
@ -172,6 +175,14 @@ public RecoveredContainerType getRecoveryType() {
public void setRecoveryType(RecoveredContainerType recoveryType) {
this.recoveryType = recoveryType;
}
public ResourceMappings getResourceMappings() {
return resMappings;
}
public void setResourceMappings(ResourceMappings mappings) {
this.resMappings = mappings;
}
}
public static class LocalResourceTrackerState {
@ -699,6 +710,18 @@ public abstract void removeAMRMProxyAppContextEntry(
public abstract void removeAMRMProxyAppContext(ApplicationAttemptId attempt)
throws IOException;
/**
* Store the assigned resources to a container.
*
* @param containerId Container Id
* @param resourceType Resource Type
* @param assignedResources Assigned resources
* @throws IOException if fails
*/
public abstract void storeAssignedResources(ContainerId containerId,
String resourceType, List<Serializable> assignedResources)
throws IOException;
protected abstract void initStorage(Configuration conf) throws IOException;
protected abstract void startStorage() throws IOException;

View File

@ -31,6 +31,7 @@
import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
@ -90,6 +91,7 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
@ -108,6 +110,7 @@
import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -400,9 +403,8 @@ public void testContainerResizeRecovery() throws Exception {
NMStateStoreService stateStore = new NMMemoryStateStoreService();
stateStore.init(conf);
stateStore.start();
Context context = createContext(conf, stateStore);
context = createContext(conf, stateStore);
ContainerManagerImpl cm = createContainerManager(context, delSrvc);
cm.dispatcher.disableExitOnDispatchException();
cm.init(conf);
cm.start();
// add an application by starting a container
@ -410,55 +412,12 @@ public void testContainerResizeRecovery() throws Exception {
ApplicationAttemptId attemptId =
ApplicationAttemptId.newInstance(appId, 1);
ContainerId cid = ContainerId.newContainerId(attemptId, 1);
Map<String, String> containerEnv = new HashMap<>();
setFlowContext(containerEnv, "app_name1", appId);
Map<String, ByteBuffer> serviceData = Collections.emptyMap();
Credentials containerCreds = new Credentials();
DataOutputBuffer dob = new DataOutputBuffer();
containerCreds.writeTokenStorageToStream(dob);
ByteBuffer containerTokens = ByteBuffer.wrap(dob.getData(), 0,
dob.getLength());
Map<ApplicationAccessType, String> acls = Collections.emptyMap();
File tmpDir = new File("target",
this.getClass().getSimpleName() + "-tmpDir");
File scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile");
PrintWriter fileWriter = new PrintWriter(scriptFile);
if (Shell.WINDOWS) {
fileWriter.println("@ping -n 100 127.0.0.1 >nul");
} else {
fileWriter.write("\numask 0");
fileWriter.write("\nexec sleep 100");
}
fileWriter.close();
FileContext localFS = FileContext.getLocalFSFileContext();
URL resource_alpha =
URL.fromPath(localFS
.makeQualified(new Path(scriptFile.getAbsolutePath())));
LocalResource rsrc_alpha = RecordFactoryProvider
.getRecordFactory(null).newRecordInstance(LocalResource.class);
rsrc_alpha.setResource(resource_alpha);
rsrc_alpha.setSize(-1);
rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION);
rsrc_alpha.setType(LocalResourceType.FILE);
rsrc_alpha.setTimestamp(scriptFile.lastModified());
String destinationFile = "dest_file";
Map<String, LocalResource> localResources = new HashMap<>();
localResources.put(destinationFile, rsrc_alpha);
List<String> commands =
Arrays.asList(Shell.getRunScriptCommand(scriptFile));
ContainerLaunchContext clc = ContainerLaunchContext.newInstance(
localResources, containerEnv, commands, serviceData,
containerTokens, acls);
StartContainersResponse startResponse = startContainer(
context, cm, cid, clc, null);
assertTrue(startResponse.getFailedRequests().isEmpty());
assertEquals(1, context.getApplications().size());
commonLaunchContainer(appId, cid, cm);
Application app = context.getApplications().get(appId);
assertNotNull(app);
// make sure the container reaches RUNNING state
waitForNMContainerState(cm, cid,
org.apache.hadoop.yarn.server.nodemanager
.containermanager.container.ContainerState.RUNNING);
Resource targetResource = Resource.newInstance(2048, 2);
ContainerUpdateResponse updateResponse =
updateContainers(context, cm, cid, targetResource);
@ -480,6 +439,58 @@ public void testContainerResizeRecovery() throws Exception {
assertEquals(targetResource, containerStatus.getCapability());
}
@Test
public void testResourceMappingRecoveryForContainer() throws Exception {
conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);
conf.setBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED, true);
NMStateStoreService stateStore = new NMMemoryStateStoreService();
stateStore.init(conf);
stateStore.start();
context = createContext(conf, stateStore);
ContainerManagerImpl cm = createContainerManager(context, delSrvc);
cm.init(conf);
cm.start();
// add an application by starting a container
ApplicationId appId = ApplicationId.newInstance(0, 1);
ApplicationAttemptId attemptId =
ApplicationAttemptId.newInstance(appId, 1);
ContainerId cid = ContainerId.newContainerId(attemptId, 1);
commonLaunchContainer(appId, cid, cm);
Application app = context.getApplications().get(appId);
assertNotNull(app);
// store resource mapping of the container
List<Serializable> gpuResources = Arrays.asList("1", "2", "3");
stateStore.storeAssignedResources(cid, "gpu", gpuResources);
List<Serializable> numaResources = Arrays.asList("numa1");
stateStore.storeAssignedResources(cid, "numa", numaResources);
List<Serializable> fpgaResources = Arrays.asList("fpga1", "fpga2");
stateStore.storeAssignedResources(cid, "fpga", fpgaResources);
cm.stop();
context = createContext(conf, stateStore);
cm = createContainerManager(context);
cm.init(conf);
cm.start();
assertEquals(1, context.getApplications().size());
app = context.getApplications().get(appId);
assertNotNull(app);
Container nmContainer = context.getContainers().get(cid);
Assert.assertNotNull(nmContainer);
ResourceMappings resourceMappings = nmContainer.getResourceMappings();
List<Serializable> assignedResource = resourceMappings
.getAssignedResources("gpu");
Assert.assertTrue(assignedResource.equals(gpuResources));
Assert.assertTrue(
resourceMappings.getAssignedResources("numa").equals(numaResources));
Assert.assertTrue(
resourceMappings.getAssignedResources("fpga").equals(fpgaResources));
}
@Test
public void testContainerCleanupOnShutdown() throws Exception {
ApplicationId appId = ApplicationId.newInstance(0, 1);
@ -552,6 +563,57 @@ public void testContainerCleanupOnShutdown() throws Exception {
verify(cm, never()).handle(isA(CMgrCompletedAppsEvent.class));
}
private void commonLaunchContainer(ApplicationId appId, ContainerId cid,
ContainerManagerImpl cm) throws Exception {
Map<String, String> containerEnv = new HashMap<>();
setFlowContext(containerEnv, "app_name1", appId);
Map<String, ByteBuffer> serviceData = Collections.emptyMap();
Credentials containerCreds = new Credentials();
DataOutputBuffer dob = new DataOutputBuffer();
containerCreds.writeTokenStorageToStream(dob);
ByteBuffer containerTokens = ByteBuffer.wrap(dob.getData(), 0,
dob.getLength());
Map<ApplicationAccessType, String> acls = Collections.emptyMap();
File tmpDir = new File("target",
this.getClass().getSimpleName() + "-tmpDir");
File scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile");
PrintWriter fileWriter = new PrintWriter(scriptFile);
if (Shell.WINDOWS) {
fileWriter.println("@ping -n 100 127.0.0.1 >nul");
} else {
fileWriter.write("\numask 0");
fileWriter.write("\nexec sleep 100");
}
fileWriter.close();
FileContext localFS = FileContext.getLocalFSFileContext();
URL resource_alpha =
URL.fromPath(localFS
.makeQualified(new Path(scriptFile.getAbsolutePath())));
LocalResource rsrc_alpha = RecordFactoryProvider
.getRecordFactory(null).newRecordInstance(LocalResource.class);
rsrc_alpha.setResource(resource_alpha);
rsrc_alpha.setSize(-1);
rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION);
rsrc_alpha.setType(LocalResourceType.FILE);
rsrc_alpha.setTimestamp(scriptFile.lastModified());
String destinationFile = "dest_file";
Map<String, LocalResource> localResources = new HashMap<>();
localResources.put(destinationFile, rsrc_alpha);
List<String> commands =
Arrays.asList(Shell.getRunScriptCommand(scriptFile));
ContainerLaunchContext clc = ContainerLaunchContext.newInstance(
localResources, containerEnv, commands, serviceData,
containerTokens, acls);
StartContainersResponse startResponse = startContainer(
context, cm, cid, clc, null);
assertTrue(startResponse.getFailedRequests().isEmpty());
assertEquals(1, context.getApplications().size());
// make sure the container reaches RUNNING state
waitForNMContainerState(cm, cid,
org.apache.hadoop.yarn.server.nodemanager
.containermanager.container.ContainerState.RUNNING);
}
private ContainerManagerImpl createContainerManager(Context context,
DeletionService delSrvc) {
return new ContainerManagerImpl(context, exec, delSrvc,

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.nodemanager.recovery;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@ -40,6 +41,7 @@
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings;
public class NMMemoryStateStoreService extends NMStateStoreService {
private Map<ApplicationId, ContainerManagerApplicationProto> apps;
@ -119,6 +121,7 @@ public synchronized List<RecoveredContainerState> loadContainersState()
rcsCopy.setRemainingRetryAttempts(rcs.getRemainingRetryAttempts());
rcsCopy.setWorkDir(rcs.getWorkDir());
rcsCopy.setLogDir(rcs.getLogDir());
rcsCopy.setResourceMappings(rcs.getResourceMappings());
result.add(rcsCopy);
}
return result;
@ -480,6 +483,17 @@ public synchronized void removeAMRMProxyAppContext(
amrmProxyState.getAppContexts().remove(attempt);
}
@Override
public void storeAssignedResources(ContainerId containerId,
String resourceType, List<Serializable> assignedResources)
throws IOException {
ResourceMappings.AssignedResources ar =
new ResourceMappings.AssignedResources();
ar.updateAssignedResources(assignedResources);
containerStates.get(containerId).getResourceMappings()
.addAssignedResources(resourceType, ar);
}
private static class TrackerState {
Map<Path, LocalResourceProto> inProgressMap =
new HashMap<Path, LocalResourceProto>();

View File

@ -32,6 +32,7 @@
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
@ -961,46 +962,12 @@ public void testUnexpectedKeyDoesntThrowException() throws IOException {
.loadContainersState();
assertTrue(recoveredContainers.isEmpty());
// create a container request
ApplicationId appId = ApplicationId.newInstance(1234, 3);
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId,
4);
ContainerId containerId = ContainerId.newContainerId(appAttemptId, 5);
LocalResource lrsrc = LocalResource.newInstance(
URL.newInstance("hdfs", "somehost", 12345, "/some/path/to/rsrc"),
LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, 123L,
1234567890L);
Map<String, LocalResource> localResources =
new HashMap<String, LocalResource>();
localResources.put("rsrc", lrsrc);
Map<String, String> env = new HashMap<String, String>();
env.put("somevar", "someval");
List<String> containerCmds = new ArrayList<String>();
containerCmds.add("somecmd");
containerCmds.add("somearg");
Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
serviceData.put("someservice",
ByteBuffer.wrap(new byte[] { 0x1, 0x2, 0x3 }));
ByteBuffer containerTokens = ByteBuffer
.wrap(new byte[] { 0x7, 0x8, 0x9, 0xa });
Map<ApplicationAccessType, String> acls =
new HashMap<ApplicationAccessType, String>();
acls.put(ApplicationAccessType.VIEW_APP, "viewuser");
acls.put(ApplicationAccessType.MODIFY_APP, "moduser");
ContainerLaunchContext clc = ContainerLaunchContext.newInstance(
localResources, env, containerCmds,
serviceData, containerTokens, acls);
Resource containerRsrc = Resource.newInstance(1357, 3);
ContainerTokenIdentifier containerTokenId = new ContainerTokenIdentifier(
containerId, "host", "user", containerRsrc, 9876543210L, 42, 2468,
Priority.newInstance(7), 13579);
Token containerToken = Token.newInstance(containerTokenId.getBytes(),
ContainerTokenIdentifier.KIND.toString(), "password".getBytes(),
"tokenservice");
StartContainerRequest containerReq = StartContainerRequest.newInstance(clc,
containerToken);
stateStore.storeContainer(containerId, 0, 0, containerReq);
StartContainerRequest startContainerRequest = storeMockContainer(
containerId);
// add a invalid key
byte[] invalidKey = ("ContainerManager/containers/"
@ -1013,7 +980,7 @@ public void testUnexpectedKeyDoesntThrowException() throws IOException {
assertEquals(RecoveredContainerStatus.REQUESTED, rcs.getStatus());
assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
assertEquals(false, rcs.getKilled());
assertEquals(containerReq, rcs.getStartRequest());
assertEquals(startContainerRequest, rcs.getStartRequest());
assertTrue(rcs.getDiagnostics().isEmpty());
assertEquals(RecoveredContainerType.KILL, rcs.getRecoveryType());
// assert unknown keys are cleaned up finally
@ -1121,6 +1088,86 @@ public void testAMRMProxyStorage() throws IOException {
}
}
@Test
public void testStateStoreForResourceMapping() throws IOException {
// test empty when no state
List<RecoveredContainerState> recoveredContainers = stateStore
.loadContainersState();
assertTrue(recoveredContainers.isEmpty());
ApplicationId appId = ApplicationId.newInstance(1234, 3);
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId,
4);
ContainerId containerId = ContainerId.newContainerId(appAttemptId, 5);
storeMockContainer(containerId);
// Store ResourceMapping
stateStore.storeAssignedResources(containerId, "gpu",
Arrays.asList("1", "2", "3"));
// This will overwrite above
List<Serializable> gpuRes1 = Arrays.asList("1", "2", "4");
stateStore.storeAssignedResources(containerId, "gpu", gpuRes1);
List<Serializable> fpgaRes = Arrays.asList("3", "4", "5", "6");
stateStore.storeAssignedResources(containerId, "fpga", fpgaRes);
List<Serializable> numaRes = Arrays.asList("numa1");
stateStore.storeAssignedResources(containerId, "numa", numaRes);
// add a invalid key
restartStateStore();
recoveredContainers = stateStore.loadContainersState();
assertEquals(1, recoveredContainers.size());
RecoveredContainerState rcs = recoveredContainers.get(0);
List<Serializable> res = rcs.getResourceMappings()
.getAssignedResources("gpu");
Assert.assertTrue(res.equals(gpuRes1));
res = rcs.getResourceMappings().getAssignedResources("fpga");
Assert.assertTrue(res.equals(fpgaRes));
res = rcs.getResourceMappings().getAssignedResources("numa");
Assert.assertTrue(res.equals(numaRes));
}
private StartContainerRequest storeMockContainer(ContainerId containerId)
throws IOException {
// create a container request
LocalResource lrsrc = LocalResource.newInstance(
URL.newInstance("hdfs", "somehost", 12345, "/some/path/to/rsrc"),
LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, 123L,
1234567890L);
Map<String, LocalResource> localResources =
new HashMap<String, LocalResource>();
localResources.put("rsrc", lrsrc);
Map<String, String> env = new HashMap<String, String>();
env.put("somevar", "someval");
List<String> containerCmds = new ArrayList<String>();
containerCmds.add("somecmd");
containerCmds.add("somearg");
Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
serviceData.put("someservice",
ByteBuffer.wrap(new byte[] { 0x1, 0x2, 0x3 }));
ByteBuffer containerTokens = ByteBuffer
.wrap(new byte[] { 0x7, 0x8, 0x9, 0xa });
Map<ApplicationAccessType, String> acls =
new HashMap<ApplicationAccessType, String>();
acls.put(ApplicationAccessType.VIEW_APP, "viewuser");
acls.put(ApplicationAccessType.MODIFY_APP, "moduser");
ContainerLaunchContext clc = ContainerLaunchContext.newInstance(
localResources, env, containerCmds,
serviceData, containerTokens, acls);
Resource containerRsrc = Resource.newInstance(1357, 3);
ContainerTokenIdentifier containerTokenId = new ContainerTokenIdentifier(
containerId, "host", "user", containerRsrc, 9876543210L, 42, 2468,
Priority.newInstance(7), 13579);
Token containerToken = Token.newInstance(containerTokenId.getBytes(),
ContainerTokenIdentifier.KIND.toString(), "password".getBytes(),
"tokenservice");
StartContainerRequest containerReq = StartContainerRequest.newInstance(clc,
containerToken);
stateStore.storeContainer(containerId, 0, 0, containerReq);
return containerReq;
}
private static class NMTokenSecretManagerForTest extends
BaseNMTokenSecretManager {
public MasterKey generateKey() {

View File

@ -37,6 +37,7 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceSet;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@ -239,4 +240,9 @@ public boolean isRecovering() {
public long getContainerStartTime() {
return 0;
}
@Override
public ResourceMappings getResourceMappings() {
return null;
}
}