diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/NumaResourceAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/NumaResourceAllocation.java index f8d47396e9..e91ac3e041 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/NumaResourceAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/NumaResourceAllocation.java @@ -17,9 +17,11 @@ */ package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.numa; +import com.google.common.collect.ImmutableMap; + import java.io.Serializable; -import java.util.HashMap; import java.util.Map; +import java.util.Objects; import java.util.Set; /** @@ -28,27 +30,18 @@ import java.util.Set; */ public class NumaResourceAllocation implements Serializable { private static final long serialVersionUID = 6339719798446595123L; - private Map nodeVsMemory; - private Map nodeVsCpus; + private final ImmutableMap nodeVsMemory; + private final ImmutableMap nodeVsCpus; - public NumaResourceAllocation() { - nodeVsMemory = new HashMap<>(); - nodeVsCpus = new HashMap<>(); + public NumaResourceAllocation(Map memoryAllocations, + Map cpuAllocations) { + nodeVsMemory = ImmutableMap.copyOf(memoryAllocations); + nodeVsCpus = ImmutableMap.copyOf(cpuAllocations); } public NumaResourceAllocation(String memNodeId, long memory, String cpuNodeId, int cpus) { - this(); - nodeVsMemory.put(memNodeId, memory); - nodeVsCpus.put(cpuNodeId, cpus); - } - - public void addMemoryNode(String memNodeId, long memory) { - nodeVsMemory.put(memNodeId, memory); - } - - public void addCpuNode(String cpuNodeId, int cpus) { - nodeVsCpus.put(cpuNodeId, cpus); + this(ImmutableMap.of(memNodeId, memory), ImmutableMap.of(cpuNodeId, cpus)); } public Set getMemNodes() { @@ -59,11 +52,37 @@ public class NumaResourceAllocation implements Serializable { return nodeVsCpus.keySet(); } - public Map getNodeVsMemory() { + public ImmutableMap getNodeVsMemory() { return nodeVsMemory; } - public Map getNodeVsCpus() { + public ImmutableMap getNodeVsCpus() { return nodeVsCpus; } -} + + @Override + public String toString() { + return "NumaResourceAllocation{" + + "nodeVsMemory=" + nodeVsMemory + + ", nodeVsCpus=" + nodeVsCpus + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + NumaResourceAllocation that = (NumaResourceAllocation) o; + return Objects.equals(nodeVsMemory, that.nodeVsMemory) && + Objects.equals(nodeVsCpus, that.nodeVsCpus); + } + + @Override + public int hashCode() { + return Objects.hash(nodeVsMemory, nodeVsCpus); + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/NumaResourceAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/NumaResourceAllocator.java index 08c328278f..ac55e2f9b3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/NumaResourceAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/NumaResourceAllocator.java @@ -29,6 +29,7 @@ import java.util.Map.Entry; import java.util.regex.Matcher; import java.util.regex.Pattern; +import com.google.common.collect.Maps; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -248,17 +249,19 @@ public class NumaResourceAllocator { // If there is no single node matched for the container resource // Check the NUMA nodes for Memory resources - NumaResourceAllocation assignedNumaNodeInfo = new NumaResourceAllocation(); - long memreq = resource.getMemorySize(); + long memoryRequirement = resource.getMemorySize(); + Map memoryAllocations = Maps.newHashMap(); for (NumaNodeResource numaNode : numaNodesList) { - long memrem = numaNode.assignAvailableMemory(memreq, containerId); - assignedNumaNodeInfo.addMemoryNode(numaNode.getNodeId(), memreq - memrem); - memreq = memrem; - if (memreq == 0) { + long memoryRemaining = numaNode. + assignAvailableMemory(memoryRequirement, containerId); + memoryAllocations.put(numaNode.getNodeId(), + memoryRequirement - memoryRemaining); + memoryRequirement = memoryRemaining; + if (memoryRequirement == 0) { break; } } - if (memreq != 0) { + if (memoryRequirement != 0) { LOG.info("There is no available memory:" + resource.getMemorySize() + " in numa nodes for " + containerId); releaseNumaResource(containerId); @@ -266,26 +269,31 @@ public class NumaResourceAllocator { } // Check the NUMA nodes for CPU resources - int cpusreq = resource.getVirtualCores(); + int cpusRequirement = resource.getVirtualCores(); + Map cpuAllocations = Maps.newHashMap(); for (int index = 0; index < numaNodesList.size(); index++) { NumaNodeResource numaNode = numaNodesList .get((currentAssignNode + index) % numaNodesList.size()); - int cpusrem = numaNode.assignAvailableCpus(cpusreq, containerId); - assignedNumaNodeInfo.addCpuNode(numaNode.getNodeId(), cpusreq - cpusrem); - cpusreq = cpusrem; - if (cpusreq == 0) { + int cpusRemaining = numaNode. + assignAvailableCpus(cpusRequirement, containerId); + cpuAllocations.put(numaNode.getNodeId(), cpusRequirement - cpusRemaining); + cpusRequirement = cpusRemaining; + if (cpusRequirement == 0) { currentAssignNode = (currentAssignNode + index + 1) % numaNodesList.size(); break; } } - if (cpusreq != 0) { + if (cpusRequirement != 0) { LOG.info("There are no available cpus:" + resource.getVirtualCores() + " in numa nodes for " + containerId); releaseNumaResource(containerId); return null; } + + NumaResourceAllocation assignedNumaNodeInfo = + new NumaResourceAllocation(memoryAllocations, cpuAllocations); LOG.info("Assigning multiple NUMA nodes (" + StringUtils.join(",", assignedNumaNodeInfo.getMemNodes()) + ") for memory, (" diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java index 951adbe551..ab62c69c20 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java @@ -1424,8 +1424,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { String keyResChng = CONTAINERS_KEY_PREFIX + container.getContainerId().toString() + CONTAINER_ASSIGNED_RESOURCES_KEY_SUFFIX + resourceType; try { - WriteBatch batch = db.createWriteBatch(); - try { + try (WriteBatch batch = db.createWriteBatch()) { ResourceMappings.AssignedResources res = new ResourceMappings.AssignedResources(); res.updateAssignedResources(assignedResources); @@ -1433,8 +1432,6 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { // New value will overwrite old values for the same key batch.put(bytes(keyResChng), res.toBytes()); db.write(batch); - } finally { - batch.close(); } } catch (DBException e) { markStoreUnHealthy(e); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java index bd86f596bc..44c3ccfbd2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java @@ -77,6 +77,9 @@ import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.fpga.FpgaResourceAllocator.FpgaDevice; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.numa.NumaResourceAllocation; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.gpu.GpuDevice; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.LocalResourceTrackerState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredAMRMProxyState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredApplicationsState; @@ -1450,7 +1453,7 @@ public class TestNMLeveldbStateStoreService { @Test public void testStateStoreForResourceMapping() throws IOException { - // test empty when no state + // test that stateStore is initially empty List recoveredContainers = loadContainersState(stateStore.getContainerStateIterator()); assertTrue(recoveredContainers.isEmpty()); @@ -1466,38 +1469,43 @@ public class TestNMLeveldbStateStoreService { ResourceMappings resourceMappings = new ResourceMappings(); when(container.getResourceMappings()).thenReturn(resourceMappings); - // Store ResourceMapping stateStore.storeAssignedResources(container, "gpu", - Arrays.asList("1", "2", "3")); - // This will overwrite above - List gpuRes1 = Arrays.asList("1", "2", "4"); + Arrays.asList(new GpuDevice(1, 1), new GpuDevice(2, 2), + new GpuDevice(3, 3))); + + // This will overwrite the above + List gpuRes1 = Arrays.asList( + new GpuDevice(1, 1), new GpuDevice(2, 2), new GpuDevice(4, 4)); stateStore.storeAssignedResources(container, "gpu", gpuRes1); - List fpgaRes = Arrays.asList("3", "4", "5", "6"); + + List fpgaRes = Arrays.asList( + new FpgaDevice("testType", 3, 3, "testIPID"), + new FpgaDevice("testType", 4, 4, "testIPID"), + new FpgaDevice("testType", 5, 5, "testIPID"), + new FpgaDevice("testType", 6, 6, "testIPID")); stateStore.storeAssignedResources(container, "fpga", fpgaRes); - List numaRes = Arrays.asList("numa1"); + + List numaRes = Arrays.asList( + new NumaResourceAllocation("testmemNodeId", 2048, "testCpuNodeId", 10)); stateStore.storeAssignedResources(container, "numa", numaRes); - // add a invalid key restartStateStore(); recoveredContainers = loadContainersState(stateStore.getContainerStateIterator()); assertEquals(1, recoveredContainers.size()); RecoveredContainerState rcs = recoveredContainers.get(0); - List res = rcs.getResourceMappings() + List resources = rcs.getResourceMappings() .getAssignedResources("gpu"); - Assert.assertTrue(res.equals(gpuRes1)); - Assert.assertTrue( - resourceMappings.getAssignedResources("gpu").equals(gpuRes1)); + Assert.assertEquals(gpuRes1, resources); + Assert.assertEquals(gpuRes1, resourceMappings.getAssignedResources("gpu")); - res = rcs.getResourceMappings().getAssignedResources("fpga"); - Assert.assertTrue(res.equals(fpgaRes)); - Assert.assertTrue( - resourceMappings.getAssignedResources("fpga").equals(fpgaRes)); + resources = rcs.getResourceMappings().getAssignedResources("fpga"); + Assert.assertEquals(fpgaRes, resources); + Assert.assertEquals(fpgaRes, resourceMappings.getAssignedResources("fpga")); - res = rcs.getResourceMappings().getAssignedResources("numa"); - Assert.assertTrue(res.equals(numaRes)); - Assert.assertTrue( - resourceMappings.getAssignedResources("numa").equals(numaRes)); + resources = rcs.getResourceMappings().getAssignedResources("numa"); + Assert.assertEquals(numaRes, resources); + Assert.assertEquals(numaRes, resourceMappings.getAssignedResources("numa")); } @Test