diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/OpportunisticSchedulerMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/OpportunisticSchedulerMetrics.java index 6169e51744..585f8f1c0e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/OpportunisticSchedulerMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/OpportunisticSchedulerMetrics.java @@ -28,6 +28,7 @@ import org.apache.hadoop.metrics2.lib.MetricsRegistry; import org.apache.hadoop.metrics2.lib.MutableCounterLong; import org.apache.hadoop.metrics2.lib.MutableGaugeInt; +import org.apache.hadoop.metrics2.lib.MutableQuantiles; import java.util.concurrent.atomic.AtomicBoolean; @@ -86,6 +87,9 @@ private static void registerMetrics() { @Metric("Aggregate # of allocated off-switch opportunistic containers") MutableCounterLong aggregateOffSwitchOContainersAllocated; + @Metric("Aggregate latency for opportunistic container allocation") + MutableQuantiles allocateLatencyOQuantiles; + @VisibleForTesting public int getAllocatedContainers() { return allocatedOContainers.value(); @@ -138,4 +142,8 @@ public void incrRackLocalOppContainers() { public void incrOffSwitchOppContainers() { aggregateOffSwitchOContainersAllocated.incr(); } + + public void addAllocateOLatencyEntry(long latency) { + allocateLatencyOQuantiles.add(latency); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java index 10c24022da..0ce1976380 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java @@ -22,6 +22,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -241,9 +242,15 @@ static class EnrichedResourceRequest { private final Map nodeLocations = new HashMap<>(); private final Map rackLocations = new HashMap<>(); private final ResourceRequest request; + private final long timestamp; EnrichedResourceRequest(ResourceRequest request) { this.request = request; + timestamp = Time.monotonicNow(); + } + + long getTimestamp() { + return timestamp; } ResourceRequest getRequest() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java index 246d450668..84e7bb9dce 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java @@ -18,9 +18,12 @@ package org.apache.hadoop.yarn.server.scheduler; +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode; +import org.apache.hadoop.yarn.server.metrics.OpportunisticSchedulerMetrics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -191,7 +194,14 @@ public void matchAllocationToOutstandingRequest(Resource capability, err.removeLocation(allocation.getResourceName()); } } + getOppSchedulerMetrics().addAllocateOLatencyEntry( + Time.monotonicNow() - err.getTimestamp()); } } } + + @VisibleForTesting + OpportunisticSchedulerMetrics getOppSchedulerMetrics() { + return OpportunisticSchedulerMetrics.getMetrics(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java index 57e397d010..548ddad6af 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java @@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode; import org.apache.hadoop.yarn.server.api.records.MasterKey; +import org.apache.hadoop.yarn.server.metrics.OpportunisticSchedulerMetrics; import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Assert; @@ -43,10 +44,18 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + public class TestOpportunisticContainerAllocator { private static final Logger LOG = @@ -54,6 +63,11 @@ public class TestOpportunisticContainerAllocator { private static final int GB = 1024; private OpportunisticContainerAllocator allocator = null; private OpportunisticContainerContext oppCntxt = null; + private static final Priority PRIORITY_NORMAL = Priority.newInstance(1); + private static final Resource CAPABILITY_1GB = + Resources.createResource(1 * GB); + private static final ExecutionTypeRequest OPPORTUNISTIC_REQ = + ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC, true); @Before public void setup() { @@ -97,10 +111,8 @@ public void testSimpleAllocation() throws Exception { ResourceBlacklistRequest.newInstance( new ArrayList<>(), new ArrayList<>()); List reqs = - Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), - "*", Resources.createResource(1 * GB), 1, true, null, - ExecutionTypeRequest.newInstance( - ExecutionType.OPPORTUNISTIC, true))); + Arrays.asList(ResourceRequest.newInstance(PRIORITY_NORMAL, + "*", CAPABILITY_1GB, 1, true, null, OPPORTUNISTIC_REQ)); ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance( ApplicationId.newInstance(0L, 1), 1); @@ -120,10 +132,8 @@ public void testBlacklistRejection() throws Exception { ResourceBlacklistRequest.newInstance( Arrays.asList("h1", "h2"), new ArrayList<>()); List reqs = - Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), - "*", Resources.createResource(1 * GB), 1, true, null, - ExecutionTypeRequest.newInstance( - ExecutionType.OPPORTUNISTIC, true))); + Arrays.asList(ResourceRequest.newInstance(PRIORITY_NORMAL, + "*", CAPABILITY_1GB, 1, true, null, OPPORTUNISTIC_REQ)); ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance( ApplicationId.newInstance(0L, 1), 1); @@ -147,21 +157,21 @@ public void testRoundRobinSimpleAllocation() throws Exception { List reqs = Arrays.asList( ResourceRequest.newBuilder().allocationRequestId(1) - .priority(Priority.newInstance(1)) + .priority(PRIORITY_NORMAL) .resourceName(ResourceRequest.ANY) - .capability(Resources.createResource(1 * GB)) + .capability(CAPABILITY_1GB) .relaxLocality(true) .executionType(ExecutionType.OPPORTUNISTIC).build(), ResourceRequest.newBuilder().allocationRequestId(2) - .priority(Priority.newInstance(1)) + .priority(PRIORITY_NORMAL) .resourceName(ResourceRequest.ANY) - .capability(Resources.createResource(1 * GB)) + .capability(CAPABILITY_1GB) .relaxLocality(true) .executionType(ExecutionType.OPPORTUNISTIC).build(), ResourceRequest.newBuilder().allocationRequestId(3) - .priority(Priority.newInstance(1)) + .priority(PRIORITY_NORMAL) .resourceName(ResourceRequest.ANY) - .capability(Resources.createResource(1 * GB)) + .capability(CAPABILITY_1GB) .relaxLocality(true) .executionType(ExecutionType.OPPORTUNISTIC).build()); ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance( @@ -197,45 +207,45 @@ public void testNodeLocalAllocation() throws Exception { List reqs = Arrays.asList( ResourceRequest.newBuilder().allocationRequestId(1) - .priority(Priority.newInstance(1)) + .priority(PRIORITY_NORMAL) .resourceName(ResourceRequest.ANY) - .capability(Resources.createResource(1 * GB)) + .capability(CAPABILITY_1GB) .relaxLocality(true) .executionType(ExecutionType.OPPORTUNISTIC).build(), ResourceRequest.newBuilder().allocationRequestId(2) - .priority(Priority.newInstance(1)) + .priority(PRIORITY_NORMAL) .resourceName("/r1") - .capability(Resources.createResource(1 * GB)) + .capability(CAPABILITY_1GB) .relaxLocality(true) .executionType(ExecutionType.OPPORTUNISTIC).build(), ResourceRequest.newBuilder().allocationRequestId(2) - .priority(Priority.newInstance(1)) + .priority(PRIORITY_NORMAL) .resourceName("h1") - .capability(Resources.createResource(1 * GB)) + .capability(CAPABILITY_1GB) .relaxLocality(true) .executionType(ExecutionType.OPPORTUNISTIC).build(), ResourceRequest.newBuilder().allocationRequestId(2) - .priority(Priority.newInstance(1)) + .priority(PRIORITY_NORMAL) .resourceName(ResourceRequest.ANY) - .capability(Resources.createResource(1 * GB)) + .capability(CAPABILITY_1GB) .relaxLocality(true) .executionType(ExecutionType.OPPORTUNISTIC).build(), ResourceRequest.newBuilder().allocationRequestId(3) - .priority(Priority.newInstance(1)) + .priority(PRIORITY_NORMAL) .resourceName("/r1") - .capability(Resources.createResource(1 * GB)) + .capability(CAPABILITY_1GB) .relaxLocality(true) .executionType(ExecutionType.OPPORTUNISTIC).build(), ResourceRequest.newBuilder().allocationRequestId(3) - .priority(Priority.newInstance(1)) + .priority(PRIORITY_NORMAL) .resourceName("h1") - .capability(Resources.createResource(1 * GB)) + .capability(CAPABILITY_1GB) .relaxLocality(true) .executionType(ExecutionType.OPPORTUNISTIC).build(), ResourceRequest.newBuilder().allocationRequestId(3) - .priority(Priority.newInstance(1)) + .priority(PRIORITY_NORMAL) .resourceName(ResourceRequest.ANY) - .capability(Resources.createResource(1 * GB)) + .capability(CAPABILITY_1GB) .relaxLocality(true) .executionType(ExecutionType.OPPORTUNISTIC).build()); ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance( @@ -272,23 +282,23 @@ public void testNodeLocalAllocationSameSchedKey() throws Exception { Arrays.asList( ResourceRequest.newBuilder().allocationRequestId(2) .numContainers(2) - .priority(Priority.newInstance(1)) + .priority(PRIORITY_NORMAL) .resourceName("/r1") - .capability(Resources.createResource(1 * GB)) + .capability(CAPABILITY_1GB) .relaxLocality(true) .executionType(ExecutionType.OPPORTUNISTIC).build(), ResourceRequest.newBuilder().allocationRequestId(2) .numContainers(2) - .priority(Priority.newInstance(1)) + .priority(PRIORITY_NORMAL) .resourceName("h1") - .capability(Resources.createResource(1 * GB)) + .capability(CAPABILITY_1GB) .relaxLocality(true) .executionType(ExecutionType.OPPORTUNISTIC).build(), ResourceRequest.newBuilder().allocationRequestId(2) .numContainers(2) - .priority(Priority.newInstance(1)) + .priority(PRIORITY_NORMAL) .resourceName(ResourceRequest.ANY) - .capability(Resources.createResource(1 * GB)) + .capability(CAPABILITY_1GB) .relaxLocality(true) .executionType(ExecutionType.OPPORTUNISTIC).build()); ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance( @@ -323,18 +333,12 @@ public void testSimpleRackLocalAllocation() throws Exception { new ArrayList<>(), new ArrayList<>()); List reqs = Arrays.asList( - ResourceRequest.newInstance(Priority.newInstance(1), "*", - Resources.createResource(1 * GB), 1, true, null, - ExecutionTypeRequest.newInstance( - ExecutionType.OPPORTUNISTIC, true)), - ResourceRequest.newInstance(Priority.newInstance(1), "h1", - Resources.createResource(1 * GB), 1, true, null, - ExecutionTypeRequest.newInstance( - ExecutionType.OPPORTUNISTIC, true)), - ResourceRequest.newInstance(Priority.newInstance(1), "/r1", - Resources.createResource(1 * GB), 1, true, null, - ExecutionTypeRequest.newInstance( - ExecutionType.OPPORTUNISTIC, true))); + ResourceRequest.newInstance(PRIORITY_NORMAL, "*", + CAPABILITY_1GB, 1, true, null, OPPORTUNISTIC_REQ), + ResourceRequest.newInstance(PRIORITY_NORMAL, "h1", + CAPABILITY_1GB, 1, true, null, OPPORTUNISTIC_REQ), + ResourceRequest.newInstance(PRIORITY_NORMAL, "/r1", + CAPABILITY_1GB, 1, true, null, OPPORTUNISTIC_REQ)); ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance( ApplicationId.newInstance(0L, 1), 1); @@ -367,39 +371,39 @@ public void testRoundRobinRackLocalAllocation() throws Exception { List reqs = Arrays.asList( ResourceRequest.newBuilder().allocationRequestId(1) - .priority(Priority.newInstance(1)) + .priority(PRIORITY_NORMAL) .resourceName("/r1") - .capability(Resources.createResource(1 * GB)) + .capability(CAPABILITY_1GB) .relaxLocality(true) .executionType(ExecutionType.OPPORTUNISTIC).build(), ResourceRequest.newBuilder().allocationRequestId(1) - .priority(Priority.newInstance(1)) + .priority(PRIORITY_NORMAL) .resourceName("h1") - .capability(Resources.createResource(1 * GB)) + .capability(CAPABILITY_1GB) .relaxLocality(true) .executionType(ExecutionType.OPPORTUNISTIC).build(), ResourceRequest.newBuilder().allocationRequestId(1) - .priority(Priority.newInstance(1)) + .priority(PRIORITY_NORMAL) .resourceName(ResourceRequest.ANY) - .capability(Resources.createResource(1 * GB)) + .capability(CAPABILITY_1GB) .relaxLocality(true) .executionType(ExecutionType.OPPORTUNISTIC).build(), ResourceRequest.newBuilder().allocationRequestId(2) - .priority(Priority.newInstance(1)) + .priority(PRIORITY_NORMAL) .resourceName("/r1") - .capability(Resources.createResource(1 * GB)) + .capability(CAPABILITY_1GB) .relaxLocality(true) .executionType(ExecutionType.OPPORTUNISTIC).build(), ResourceRequest.newBuilder().allocationRequestId(2) - .priority(Priority.newInstance(1)) + .priority(PRIORITY_NORMAL) .resourceName("h1") - .capability(Resources.createResource(1 * GB)) + .capability(CAPABILITY_1GB) .relaxLocality(true) .executionType(ExecutionType.OPPORTUNISTIC).build(), ResourceRequest.newBuilder().allocationRequestId(2) - .priority(Priority.newInstance(1)) + .priority(PRIORITY_NORMAL) .resourceName(ResourceRequest.ANY) - .capability(Resources.createResource(1 * GB)) + .capability(CAPABILITY_1GB) .relaxLocality(true) .executionType(ExecutionType.OPPORTUNISTIC).build()); ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance( @@ -437,18 +441,12 @@ public void testRoundRobinRackLocalAllocationSameSchedKey() throws Exception { new ArrayList<>(), new ArrayList<>()); List reqs = Arrays.asList( - ResourceRequest.newInstance(Priority.newInstance(1), "*", - Resources.createResource(1 * GB), 2, true, null, - ExecutionTypeRequest.newInstance( - ExecutionType.OPPORTUNISTIC, true)), - ResourceRequest.newInstance(Priority.newInstance(1), "h1", - Resources.createResource(1 * GB), 2, true, null, - ExecutionTypeRequest.newInstance( - ExecutionType.OPPORTUNISTIC, true)), - ResourceRequest.newInstance(Priority.newInstance(1), "/r1", - Resources.createResource(1 * GB), 2, true, null, - ExecutionTypeRequest.newInstance( - ExecutionType.OPPORTUNISTIC, true))); + ResourceRequest.newInstance(PRIORITY_NORMAL, "*", + CAPABILITY_1GB, 2, true, null, OPPORTUNISTIC_REQ), + ResourceRequest.newInstance(PRIORITY_NORMAL, "h1", + CAPABILITY_1GB, 2, true, null, OPPORTUNISTIC_REQ), + ResourceRequest.newInstance(PRIORITY_NORMAL, "/r1", + CAPABILITY_1GB, 2, true, null, OPPORTUNISTIC_REQ)); ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance( ApplicationId.newInstance(0L, 1), 1); @@ -484,18 +482,12 @@ public void testOffSwitchAllocationWhenNoNodeOrRack() throws Exception { new ArrayList<>(), new ArrayList<>()); List reqs = Arrays.asList( - ResourceRequest.newInstance(Priority.newInstance(1), "*", - Resources.createResource(1 * GB), 2, true, null, - ExecutionTypeRequest.newInstance( - ExecutionType.OPPORTUNISTIC, true)), - ResourceRequest.newInstance(Priority.newInstance(1), "h6", - Resources.createResource(1 * GB), 2, true, null, - ExecutionTypeRequest.newInstance( - ExecutionType.OPPORTUNISTIC, true)), - ResourceRequest.newInstance(Priority.newInstance(1), "/r3", - Resources.createResource(1 * GB), 2, true, null, - ExecutionTypeRequest.newInstance( - ExecutionType.OPPORTUNISTIC, true))); + ResourceRequest.newInstance(PRIORITY_NORMAL, "*", + CAPABILITY_1GB, 2, true, null, OPPORTUNISTIC_REQ), + ResourceRequest.newInstance(PRIORITY_NORMAL, "h6", + CAPABILITY_1GB, 2, true, null, OPPORTUNISTIC_REQ), + ResourceRequest.newInstance(PRIORITY_NORMAL, "/r3", + CAPABILITY_1GB, 2, true, null, OPPORTUNISTIC_REQ)); ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance( ApplicationId.newInstance(0L, 1), 1); @@ -524,18 +516,12 @@ public void testLotsOfContainersRackLocalAllocationSameSchedKey() new ArrayList<>(), new ArrayList<>()); List reqs = Arrays.asList( - ResourceRequest.newInstance(Priority.newInstance(1), "*", - Resources.createResource(1 * GB), 1000, true, null, - ExecutionTypeRequest.newInstance( - ExecutionType.OPPORTUNISTIC, true)), - ResourceRequest.newInstance(Priority.newInstance(1), "h1", - Resources.createResource(1 * GB), 1000, true, null, - ExecutionTypeRequest.newInstance( - ExecutionType.OPPORTUNISTIC, true)), - ResourceRequest.newInstance(Priority.newInstance(1), "/r1", - Resources.createResource(1 * GB), 1000, true, null, - ExecutionTypeRequest.newInstance( - ExecutionType.OPPORTUNISTIC, true))); + ResourceRequest.newInstance(PRIORITY_NORMAL, "*", + CAPABILITY_1GB, 1000, true, null, OPPORTUNISTIC_REQ), + ResourceRequest.newInstance(PRIORITY_NORMAL, "h1", + CAPABILITY_1GB, 1000, true, null, OPPORTUNISTIC_REQ), + ResourceRequest.newInstance(PRIORITY_NORMAL, "/r1", + CAPABILITY_1GB, 1000, true, null, OPPORTUNISTIC_REQ)); ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance( ApplicationId.newInstance(0L, 1), 1); @@ -567,21 +553,21 @@ public void testLotsOfContainersRackLocalAllocation() List reqs = new ArrayList<>(); for (int i = 0; i < 100; i++) { reqs.add(ResourceRequest.newBuilder().allocationRequestId(i + 1) - .priority(Priority.newInstance(1)) + .priority(PRIORITY_NORMAL) .resourceName("*") - .capability(Resources.createResource(1 * GB)) + .capability(CAPABILITY_1GB) .relaxLocality(true) .executionType(ExecutionType.OPPORTUNISTIC).build()); reqs.add(ResourceRequest.newBuilder().allocationRequestId(i + 1) - .priority(Priority.newInstance(1)) + .priority(PRIORITY_NORMAL) .resourceName("h1") - .capability(Resources.createResource(1 * GB)) + .capability(CAPABILITY_1GB) .relaxLocality(true) .executionType(ExecutionType.OPPORTUNISTIC).build()); reqs.add(ResourceRequest.newBuilder().allocationRequestId(i + 1) - .priority(Priority.newInstance(1)) + .priority(PRIORITY_NORMAL) .resourceName("/r1") - .capability(Resources.createResource(1 * GB)) + .capability(CAPABILITY_1GB) .relaxLocality(true) .executionType(ExecutionType.OPPORTUNISTIC).build()); } @@ -613,10 +599,8 @@ public void testAllocationWithNodeLabels() throws Exception { ResourceBlacklistRequest.newInstance( new ArrayList<>(), new ArrayList<>()); List reqs = - Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), - "*", Resources.createResource(1 * GB), 1, true, "label", - ExecutionTypeRequest.newInstance( - ExecutionType.OPPORTUNISTIC, true))); + Arrays.asList(ResourceRequest.newInstance(PRIORITY_NORMAL, + "*", CAPABILITY_1GB, 1, true, "label", OPPORTUNISTIC_REQ)); ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance( ApplicationId.newInstance(0L, 1), 1); @@ -655,18 +639,13 @@ public void testMaxAllocationsPerAMHeartbeat() throws Exception { ResourceBlacklistRequest.newInstance( new ArrayList<>(), new ArrayList<>()); allocator.setMaxAllocationsPerAMHeartbeat(2); - final Priority priority = Priority.newInstance(1); - final ExecutionTypeRequest oppRequest = ExecutionTypeRequest.newInstance( - ExecutionType.OPPORTUNISTIC, true); - final Resource resource = Resources.createResource(1 * GB); - List reqs = - Arrays.asList( - ResourceRequest.newInstance(priority, "*", - resource, 3, true, null, oppRequest), - ResourceRequest.newInstance(priority, "h6", - resource, 3, true, null, oppRequest), - ResourceRequest.newInstance(priority, "/r3", - resource, 3, true, null, oppRequest)); + List reqs = Arrays.asList( + ResourceRequest.newInstance(PRIORITY_NORMAL, "*", CAPABILITY_1GB, 3, + true, null, OPPORTUNISTIC_REQ), + ResourceRequest.newInstance(PRIORITY_NORMAL, "h6", CAPABILITY_1GB, 3, + true, null, OPPORTUNISTIC_REQ), + ResourceRequest.newInstance(PRIORITY_NORMAL, "/r3", CAPABILITY_1GB, 3, + true, null, OPPORTUNISTIC_REQ)); ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance( ApplicationId.newInstance(0L, 1), 1); @@ -708,15 +687,14 @@ public void testMaxAllocationsPerAMHeartbeatDifferentSchedKey() allocator.setMaxAllocationsPerAMHeartbeat(2); final ExecutionTypeRequest oppRequest = ExecutionTypeRequest.newInstance( ExecutionType.OPPORTUNISTIC, true); - final Resource resource = Resources.createResource(1 * GB); List reqs = Arrays.asList( ResourceRequest.newInstance(Priority.newInstance(1), "*", - resource, 1, true, null, oppRequest), + CAPABILITY_1GB, 1, true, null, OPPORTUNISTIC_REQ), ResourceRequest.newInstance(Priority.newInstance(2), "h6", - resource, 2, true, null, oppRequest), + CAPABILITY_1GB, 2, true, null, OPPORTUNISTIC_REQ), ResourceRequest.newInstance(Priority.newInstance(3), "/r3", - resource, 2, true, null, oppRequest)); + CAPABILITY_1GB, 2, true, null, OPPORTUNISTIC_REQ)); ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance( ApplicationId.newInstance(0L, 1), 1); @@ -761,14 +739,12 @@ public void testMaxAllocationsPerAMHeartbeatWithNoLimit() throws Exception { new ArrayList<>(), new ArrayList<>()); allocator.setMaxAllocationsPerAMHeartbeat(-1); - Priority priority = Priority.newInstance(1); - Resource capability = Resources.createResource(1 * GB); List reqs = new ArrayList<>(); for (int i = 0; i < 20; i++) { reqs.add(ResourceRequest.newBuilder().allocationRequestId(i + 1) - .priority(priority) + .priority(PRIORITY_NORMAL) .resourceName("h1") - .capability(capability) + .capability(CAPABILITY_1GB) .relaxLocality(true) .executionType(ExecutionType.OPPORTUNISTIC).build()); } @@ -802,14 +778,12 @@ public void testMaxAllocationsPerAMHeartbeatWithHighLimit() new ArrayList<>(), new ArrayList<>()); allocator.setMaxAllocationsPerAMHeartbeat(100); - Priority priority = Priority.newInstance(1); - Resource capability = Resources.createResource(1 * GB); List reqs = new ArrayList<>(); for (int i = 0; i < 20; i++) { reqs.add(ResourceRequest.newBuilder().allocationRequestId(i + 1) - .priority(priority) + .priority(PRIORITY_NORMAL) .resourceName("h1") - .capability(capability) + .capability(CAPABILITY_1GB) .relaxLocality(true) .executionType(ExecutionType.OPPORTUNISTIC).build()); } @@ -829,4 +803,46 @@ public void testMaxAllocationsPerAMHeartbeatWithHighLimit() // all containers should be allocated in single heartbeat. Assert.assertEquals(20, containers.size()); } + + /** + * Test opportunistic container allocation latency metrics. + * @throws Exception + */ + @Test + public void testAllocationLatencyMetrics() throws Exception { + oppCntxt = spy(oppCntxt); + OpportunisticSchedulerMetrics metrics = + mock(OpportunisticSchedulerMetrics.class); + when(oppCntxt.getOppSchedulerMetrics()).thenReturn(metrics); + ResourceBlacklistRequest blacklistRequest = + ResourceBlacklistRequest.newInstance( + Collections.emptyList(), Collections.emptyList()); + List reqs = Arrays.asList( + ResourceRequest.newInstance(PRIORITY_NORMAL, "*", CAPABILITY_1GB, 2, + true, null, OPPORTUNISTIC_REQ), + ResourceRequest.newInstance(PRIORITY_NORMAL, "h6", CAPABILITY_1GB, 2, + true, null, OPPORTUNISTIC_REQ), + ResourceRequest.newInstance(PRIORITY_NORMAL, "/r3", CAPABILITY_1GB, 2, + true, null, OPPORTUNISTIC_REQ)); + ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance( + ApplicationId.newInstance(0L, 1), 1); + + oppCntxt.updateNodeList( + Arrays.asList( + RemoteNode.newInstance( + NodeId.newInstance("h3", 1234), "h3:1234", "/r2"), + RemoteNode.newInstance( + NodeId.newInstance("h2", 1234), "h2:1234", "/r1"), + RemoteNode.newInstance( + NodeId.newInstance("h5", 1234), "h5:1234", "/r1"), + RemoteNode.newInstance( + NodeId.newInstance("h4", 1234), "h4:1234", "/r2"))); + + List containers = allocator.allocateContainers( + blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser"); + LOG.info("Containers: {}", containers); + Assert.assertEquals(2, containers.size()); + // for each allocated container, latency should be added. + verify(metrics, times(2)).addAllocateOLatencyEntry(anyLong()); + } }