YARN-11198. clean up numa resources from statestore (#4546)
* YARN-11198. clean up numa resources from levelDB Co-authored-by: Deb <dbsamrat@3c22fba1b03f.ant.amazon.com>
This commit is contained in:
parent
6f9c4359ec
commit
84ce592a85
@ -231,7 +231,7 @@ public synchronized NumaResourceAllocation allocateNumaNodes(
|
|||||||
}
|
}
|
||||||
|
|
||||||
private NumaResourceAllocation allocate(ContainerId containerId,
|
private NumaResourceAllocation allocate(ContainerId containerId,
|
||||||
Resource resource) {
|
Resource resource) throws ResourceHandlerException {
|
||||||
for (int index = 0; index < numaNodesList.size(); index++) {
|
for (int index = 0; index < numaNodesList.size(); index++) {
|
||||||
NumaNodeResource numaNode = numaNodesList
|
NumaNodeResource numaNode = numaNodesList
|
||||||
.get((currentAssignNode + index) % numaNodesList.size());
|
.get((currentAssignNode + index) % numaNodesList.size());
|
||||||
@ -306,12 +306,20 @@ private NumaResourceAllocation allocate(ContainerId containerId,
|
|||||||
* Release assigned NUMA resources for the container.
|
* Release assigned NUMA resources for the container.
|
||||||
*
|
*
|
||||||
* @param containerId the container ID
|
* @param containerId the container ID
|
||||||
|
* @throws ResourceHandlerException when failed to release numa resource
|
||||||
*/
|
*/
|
||||||
public synchronized void releaseNumaResource(ContainerId containerId) {
|
public synchronized void releaseNumaResource(ContainerId containerId)
|
||||||
|
throws ResourceHandlerException {
|
||||||
LOG.info("Releasing the assigned NUMA resources for " + containerId);
|
LOG.info("Releasing the assigned NUMA resources for " + containerId);
|
||||||
for (NumaNodeResource numaNode : numaNodesList) {
|
for (NumaNodeResource numaNode : numaNodesList) {
|
||||||
numaNode.releaseResources(containerId);
|
numaNode.releaseResources(containerId);
|
||||||
}
|
}
|
||||||
|
// delete from NM State store
|
||||||
|
try {
|
||||||
|
context.getNMStateStore().releaseAssignedResources(containerId, NUMA_RESOURCE_TYPE);
|
||||||
|
} catch (IOException e){
|
||||||
|
throw new ResourceHandlerException(e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1813,4 +1813,21 @@ protected void checkVersion() throws IOException {
|
|||||||
+ getCurrentVersion() + ", but loading version " + loadedVersion);
|
+ getCurrentVersion() + ", but loading version " + loadedVersion);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@Override
|
||||||
|
public void releaseAssignedResources(ContainerId containerId, String resourceType)
|
||||||
|
throws IOException {
|
||||||
|
LOG.debug("releaseAssignedResources: containerId=" + containerId + " resourceType="
|
||||||
|
+ resourceType);
|
||||||
|
try {
|
||||||
|
try (WriteBatch batch = db.createWriteBatch()) {
|
||||||
|
String key = CONTAINERS_KEY_PREFIX + containerId
|
||||||
|
+ CONTAINER_ASSIGNED_RESOURCES_KEY_SUFFIX + resourceType;
|
||||||
|
batch.delete(bytes(key));
|
||||||
|
db.write(batch);
|
||||||
|
}
|
||||||
|
}catch (DBException e){
|
||||||
|
markStoreUnHealthy(e);
|
||||||
|
throw new IOException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -786,6 +786,15 @@ public abstract void storeAssignedResources(Container container,
|
|||||||
String resourceType, List<Serializable> assignedResources)
|
String resourceType, List<Serializable> assignedResources)
|
||||||
throws IOException;
|
throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Delete the assigned resources of a container of specific resourceType.
|
||||||
|
* @param containerId Container Id
|
||||||
|
* @param resourceType resource Type
|
||||||
|
* @throws IOException while releasing resources
|
||||||
|
*/
|
||||||
|
public void releaseAssignedResources(ContainerId containerId, String resourceType)
|
||||||
|
throws IOException {}
|
||||||
|
|
||||||
protected abstract void initStorage(Configuration conf) throws IOException;
|
protected abstract void initStorage(Configuration conf) throws IOException;
|
||||||
|
|
||||||
protected abstract void startStorage() throws IOException;
|
protected abstract void startStorage() throws IOException;
|
||||||
|
@ -1756,6 +1756,18 @@ public void testStateStoreForResourceMapping() throws IOException {
|
|||||||
resources = rcs.getResourceMappings().getAssignedResources("numa");
|
resources = rcs.getResourceMappings().getAssignedResources("numa");
|
||||||
Assert.assertEquals(numaRes, resources);
|
Assert.assertEquals(numaRes, resources);
|
||||||
Assert.assertEquals(numaRes, resourceMappings.getAssignedResources("numa"));
|
Assert.assertEquals(numaRes, resourceMappings.getAssignedResources("numa"));
|
||||||
|
// test removing numa resources from state store
|
||||||
|
stateStore.releaseAssignedResources(containerId, "numa");
|
||||||
|
recoveredContainers = loadContainersState(stateStore.getContainerStateIterator());
|
||||||
|
resourceMappings = recoveredContainers.get(0).getResourceMappings();
|
||||||
|
assertTrue(resourceMappings.getAssignedResources("numa").isEmpty());
|
||||||
|
|
||||||
|
// testing calling deletion of non-existing key doesn't break anything
|
||||||
|
try {
|
||||||
|
stateStore.releaseAssignedResources(containerId, "numa");
|
||||||
|
}catch (RuntimeException e){
|
||||||
|
Assert.fail("Should not throw exception while deleting non existing key from statestore");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
Loading…
Reference in New Issue
Block a user