From 6bf42e48ef658bf6dd86ebd706562ce7cc06216a Mon Sep 17 00:00:00 2001 From: Subru Krishnan Date: Mon, 1 May 2017 18:48:36 -0700 Subject: [PATCH] YARN-5331. Extend RLESparseResourceAllocation with period for supporting recurring reservations in YARN ReservationSystem. (Sangeetha Abdu Jyothi via Subru). --- .../PeriodicRLESparseResourceAllocation.java | 167 ++++++++++++++++++ .../RLESparseResourceAllocation.java | 51 ++++++ .../ReservationSystemTestUtil.java | 14 ++ ...stPeriodicRLESparseResourceAllocation.java | 142 +++++++++++++++ .../TestRLESparseResourceAllocation.java | 56 +++++- 5 files changed, 429 insertions(+), 1 deletion(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PeriodicRLESparseResourceAllocation.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestPeriodicRLESparseResourceAllocation.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PeriodicRLESparseResourceAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PeriodicRLESparseResourceAllocation.java new file mode 100644 index 0000000000..8e3be8b3da --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PeriodicRLESparseResourceAllocation.java @@ -0,0 +1,167 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.reservation; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.util.resource.Resources; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This data structure stores a periodic RLESparseResourceAllocation. + * Default period is 1 day (86400000ms). + */ +public class PeriodicRLESparseResourceAllocation extends + RLESparseResourceAllocation { + + // Log + private static final Logger LOG = LoggerFactory + .getLogger(PeriodicRLESparseResourceAllocation.class); + + private long timePeriod; + + /** + * Constructor. + * + * @param rleVector {@link RLESparseResourceAllocation} with the run-length + encoded data. + * @param timePeriod Time period in milliseconds. + */ + public PeriodicRLESparseResourceAllocation( + RLESparseResourceAllocation rleVector, Long timePeriod) { + super(rleVector.getCumulative(), rleVector.getResourceCalculator()); + this.timePeriod = timePeriod; + } + + /** + * Constructor. Default time period set to 1 day. + * + * @param rleVector {@link RLESparseResourceAllocation} with the run-length + encoded data. + */ + public PeriodicRLESparseResourceAllocation( + RLESparseResourceAllocation rleVector) { + this(rleVector, 86400000L); + } + + /** + * Get capacity at time based on periodic repetition. + * + * @param tick UTC time for which the allocated {@link Resource} is queried. + * @return {@link Resource} allocated at specified time + */ + public Resource getCapacityAtTime(long tick) { + long convertedTime = (tick % timePeriod); + return super.getCapacityAtTime(convertedTime); + } + + /** + * Add resource for the specified interval. This function will be used by + * {@link InMemoryPlan} while placing reservations between 0 and timePeriod. + * The interval may include 0, but the end time must be strictly less than + * timePeriod. + * + * @param interval {@link ReservationInterval} to which the specified + * resource is to be added. + * @param resource {@link Resource} to be added to the interval specified. + * @return true if addition is successful, false otherwise + */ + public boolean addInterval(ReservationInterval interval, + Resource resource) { + long startTime = interval.getStartTime(); + long endTime = interval.getEndTime(); + if (startTime >= 0 && endTime > startTime && endTime <= timePeriod) { + return super.addInterval(interval, resource); + } else { + LOG.info("Cannot set capacity beyond end time: " + timePeriod); + return false; + } + } + + /** + * Removes a resource for the specified interval. + * + * @param interval the {@link ReservationInterval} for which the resource is + * to be removed. + * @param resource the {@link Resource} to be removed. + * @return true if removal is successful, false otherwise + */ + public boolean removeInterval( + ReservationInterval interval, Resource resource) { + long startTime = interval.getStartTime(); + long endTime = interval.getEndTime(); + // If the resource to be subtracted is less than the minimum resource in + // the range, abort removal to avoid negative capacity. + if (!Resources.fitsIn( + resource, super.getMinimumCapacityInInterval(interval))) { + LOG.info("Request to remove more resources than what is available"); + return false; + } + if (startTime >= 0 && endTime > startTime && endTime <= timePeriod) { + return super.removeInterval(interval, resource); + } else { + LOG.info("Interval extends beyond the end time " + timePeriod); + return false; + } + } + + /** + * Get maximum capacity at periodic offsets from the specified time. + * + * @param tick UTC time base from which offsets are specified for finding + * the maximum capacity. + * @param period periodic offset at which capacities are evaluted. + * @return the maximum {@link Resource} across the specified time instants. + * @return true if removal is successful, false otherwise + */ + public Resource getMaximumPeriodicCapacity(long tick, long period) { + Resource maxResource; + if (period < timePeriod) { + maxResource = + super.getMaximumPeriodicCapacity(tick % timePeriod, period); + } else { + // if period is greater than the length of PeriodicRLESparseAllocation, + // only a single value exists in this interval. + maxResource = super.getCapacityAtTime(tick % timePeriod); + } + return maxResource; + } + + /** + * Get time period of PeriodicRLESparseResourceAllocation. + * + * @return timePeriod time period represented in ms. + */ + public long getTimePeriod() { + return this.timePeriod; + } + + @Override + public String toString() { + StringBuilder ret = new StringBuilder(); + ret.append("Period: ").append(timePeriod).append("\n") + .append(super.toString()); + if (super.isEmpty()) { + ret.append(" no allocations\n"); + } + return ret.toString(); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java index c18a93ed1b..658387b094 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java @@ -132,6 +132,7 @@ public boolean removeInterval(ReservationInterval reservationInterval, * Returns the capacity, i.e. total resources allocated at the specified point * of time. * + * @param tick timeStap at which resource needs to be known * @return the resources allocated at the specified time */ public Resource getCapacityAtTime(long tick) { @@ -309,6 +310,10 @@ public NavigableMap getCumulative() { } } + public ResourceCalculator getResourceCalculator() { + return resourceCalculator; + } + /** * Merges the range start to end of two {@code RLESparseResourceAllocation} * using a given {@code RLEOperator}. @@ -533,4 +538,50 @@ public enum RLEOperator { add, subtract, min, max, subtractTestNonNegative } + /** + * Get the maximum capacity across specified time instances. The search-space + * is specified using the starting value, tick, and the periodic interval for + * search. Maximum resource allocation across tick, tick + period, + * tick + 2 * period,..., tick + n * period .. is returned. + * + * @param tick the starting time instance + * @param period interval at which capacity is evaluated + * @return maximum resource allocation + */ + public Resource getMaximumPeriodicCapacity(long tick, long period) { + Resource maxCapacity = ZERO_RESOURCE; + if (!cumulativeCapacity.isEmpty()) { + Long lastKey = cumulativeCapacity.lastKey(); + for (long t = tick; t <= lastKey; t = t + period) { + maxCapacity = Resources.componentwiseMax(maxCapacity, + cumulativeCapacity.floorEntry(t).getValue()); + } + } + return maxCapacity; + } + + /** + * Get the minimum capacity in the specified time range. + * + * @param interval the {@link ReservationInterval} to be searched + * @return minimum resource allocation + */ + public Resource getMinimumCapacityInInterval(ReservationInterval interval) { + Resource minCapacity = Resource.newInstance( + Integer.MAX_VALUE, Integer.MAX_VALUE); + long start = interval.getStartTime(); + long end = interval.getEndTime(); + NavigableMap capacityRange = + this.getRangeOverlapping(start, end).getCumulative(); + if (!capacityRange.isEmpty()) { + for (Map.Entry entry : capacityRange.entrySet()) { + if (entry.getValue() != null) { + minCapacity = Resources.componentwiseMin(minCapacity, + entry.getValue()); + } + } + } + return minCapacity; + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java index 1ff6a1a818..e99842e29e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java @@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; +import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.junit.Assert; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; @@ -414,6 +415,19 @@ public static Map generateAllocation( return req; } + public static RLESparseResourceAllocation + generateRLESparseResourceAllocation(int[] alloc, long[] timeSteps) { + TreeMap allocationsMap = new TreeMap<>(); + for (int i = 0; i < alloc.length; i++) { + allocationsMap.put(timeSteps[i], + Resource.newInstance(alloc[i], alloc[i])); + } + RLESparseResourceAllocation rleVector = + new RLESparseResourceAllocation(allocationsMap, + new DefaultResourceCalculator()); + return rleVector; + } + public static Resource calculateClusterResource(int numContainers) { return Resource.newInstance(numContainers * 1024, numContainers); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestPeriodicRLESparseResourceAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestPeriodicRLESparseResourceAllocation.java new file mode 100644 index 0000000000..554eb58954 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestPeriodicRLESparseResourceAllocation.java @@ -0,0 +1,142 @@ +/****************************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *****************************************************************************/ + +package org.apache.hadoop.yarn.server.resourcemanager.reservation; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Testing the class PeriodicRLESparseResourceAllocation. + */ +public class TestPeriodicRLESparseResourceAllocation { + + private static final Logger LOG = LoggerFactory + .getLogger(TestPeriodicRLESparseResourceAllocation.class); + + @Test + public void testPeriodicCapacity() { + int[] alloc = {10, 7, 5, 2, 0}; + long[] timeSteps = {0L, 5L, 10L, 15L, 19L}; + RLESparseResourceAllocation rleSparseVector = + ReservationSystemTestUtil.generateRLESparseResourceAllocation( + alloc, timeSteps); + PeriodicRLESparseResourceAllocation periodicVector = + new PeriodicRLESparseResourceAllocation(rleSparseVector, 20L); + LOG.info(periodicVector.toString()); + Assert.assertEquals(Resource.newInstance(5, 5), + periodicVector.getCapacityAtTime(10L)); + Assert.assertEquals(Resource.newInstance(10, 10), + periodicVector.getCapacityAtTime(20L)); + Assert.assertEquals(Resource.newInstance(7, 7), + periodicVector.getCapacityAtTime(27L)); + Assert.assertEquals(Resource.newInstance(5, 5), + periodicVector.getCapacityAtTime(50L)); + } + + @Test + public void testMaxPeriodicCapacity() { + int[] alloc = {2, 5, 7, 10, 3, 4, 6, 8}; + long[] timeSteps = {0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L}; + RLESparseResourceAllocation rleSparseVector = + ReservationSystemTestUtil.generateRLESparseResourceAllocation( + alloc, timeSteps); + PeriodicRLESparseResourceAllocation periodicVector = + new PeriodicRLESparseResourceAllocation(rleSparseVector, 8L); + LOG.info(periodicVector.toString()); + Assert.assertEquals( + periodicVector.getMaximumPeriodicCapacity(0, 1), + Resource.newInstance(10, 10)); + Assert.assertEquals( + periodicVector.getMaximumPeriodicCapacity(8, 2), + Resource.newInstance(7, 7)); + Assert.assertEquals( + periodicVector.getMaximumPeriodicCapacity(16, 3), + Resource.newInstance(10, 10)); + Assert.assertEquals( + periodicVector.getMaximumPeriodicCapacity(17, 4), + Resource.newInstance(5, 5)); + Assert.assertEquals( + periodicVector.getMaximumPeriodicCapacity(32, 5), + Resource.newInstance(4, 4)); + } + + @Test + public void testSetCapacityInInterval() { + int[] alloc = {2, 5, 0}; + long[] timeSteps = {1L, 2L, 3L}; + RLESparseResourceAllocation rleSparseVector = + ReservationSystemTestUtil.generateRLESparseResourceAllocation( + alloc, timeSteps); + PeriodicRLESparseResourceAllocation periodicVector = + new PeriodicRLESparseResourceAllocation(rleSparseVector, 10L); + ReservationInterval interval = new ReservationInterval(5L, 10L); + periodicVector.addInterval( + interval, Resource.newInstance(8, 8)); + Assert.assertEquals(Resource.newInstance(8, 8), + periodicVector.getCapacityAtTime(5L)); + Assert.assertEquals(Resource.newInstance(8, 8), + periodicVector.getCapacityAtTime(9L)); + Assert.assertEquals(Resource.newInstance(0, 0), + periodicVector.getCapacityAtTime(10L)); + Assert.assertEquals(Resource.newInstance(0, 0), + periodicVector.getCapacityAtTime(0L)); + Assert.assertFalse(periodicVector.addInterval( + new ReservationInterval(7L, 12L), Resource.newInstance(8, 8))); + } + + public void testRemoveInterval() { + int[] alloc = {2, 5, 3, 4, 0}; + long[] timeSteps = {1L, 3L, 5L, 7L, 9L}; + RLESparseResourceAllocation rleSparseVector = + ReservationSystemTestUtil.generateRLESparseResourceAllocation( + alloc, timeSteps); + PeriodicRLESparseResourceAllocation periodicVector = + new PeriodicRLESparseResourceAllocation(rleSparseVector, 10L); + ReservationInterval interval = new ReservationInterval(3L, 7L); + Assert.assertTrue(periodicVector.removeInterval( + interval, Resource.newInstance(3, 3))); + Assert.assertEquals(Resource.newInstance(2, 2), + periodicVector.getCapacityAtTime(1L)); + Assert.assertEquals(Resource.newInstance(2, 2), + periodicVector.getCapacityAtTime(2L)); + Assert.assertEquals(Resource.newInstance(2, 2), + periodicVector.getCapacityAtTime(3L)); + Assert.assertEquals(Resource.newInstance(2, 2), + periodicVector.getCapacityAtTime(4L)); + Assert.assertEquals(Resource.newInstance(0, 0), + periodicVector.getCapacityAtTime(5L)); + Assert.assertEquals(Resource.newInstance(0, 0), + periodicVector.getCapacityAtTime(6L)); + Assert.assertEquals(Resource.newInstance(4, 4), + periodicVector.getCapacityAtTime(7L)); + + // invalid interval + Assert.assertFalse(periodicVector.removeInterval( + new ReservationInterval(7L, 12L), Resource.newInstance(1, 1))); + + // invalid capacity + Assert.assertFalse(periodicVector.removeInterval( + new ReservationInterval(2L, 4L), Resource.newInstance(8, 8))); + + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestRLESparseResourceAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestRLESparseResourceAllocation.java index f8d2a4ad92..bfe46e17bd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestRLESparseResourceAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestRLESparseResourceAllocation.java @@ -524,7 +524,61 @@ public void testToIntervalMap() { } } - private void setupArrays(TreeMap a, TreeMap b) { + @Test + public void testMaxPeriodicCapacity() { + long[] timeSteps = {0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L}; + int[] alloc = {2, 5, 7, 10, 3, 4, 6, 8}; + RLESparseResourceAllocation rleSparseVector = + ReservationSystemTestUtil.generateRLESparseResourceAllocation( + alloc, timeSteps); + LOG.info(rleSparseVector.toString()); + Assert.assertEquals( + rleSparseVector.getMaximumPeriodicCapacity(0, 1), + Resource.newInstance(10, 10)); + Assert.assertEquals( + rleSparseVector.getMaximumPeriodicCapacity(0, 2), + Resource.newInstance(7, 7)); + Assert.assertEquals( + rleSparseVector.getMaximumPeriodicCapacity(0, 3), + Resource.newInstance(10, 10)); + Assert.assertEquals( + rleSparseVector.getMaximumPeriodicCapacity(0, 4), + Resource.newInstance(3, 3)); + Assert.assertEquals( + rleSparseVector.getMaximumPeriodicCapacity(0, 5), + Resource.newInstance(4, 4)); + Assert.assertEquals( + rleSparseVector.getMaximumPeriodicCapacity(0, 5), + Resource.newInstance(4, 4)); + Assert.assertEquals( + rleSparseVector.getMaximumPeriodicCapacity(7, 5), + Resource.newInstance(8, 8)); + Assert.assertEquals( + rleSparseVector.getMaximumPeriodicCapacity(10, 3), + Resource.newInstance(0, 0)); + } + + @Test + public void testGetMinimumCapacityInInterval() { + long[] timeSteps = {0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L}; + int[] alloc = {2, 5, 7, 10, 3, 4, 0, 8}; + RLESparseResourceAllocation rleSparseVector = + ReservationSystemTestUtil.generateRLESparseResourceAllocation( + alloc, timeSteps); + LOG.info(rleSparseVector.toString()); + Assert.assertEquals( + rleSparseVector.getMinimumCapacityInInterval( + new ReservationInterval(1L, 3L)), Resource.newInstance(5, 5)); + Assert.assertEquals( + rleSparseVector.getMinimumCapacityInInterval( + new ReservationInterval(2L, 5L)), Resource.newInstance(3, 3)); + Assert.assertEquals( + rleSparseVector.getMinimumCapacityInInterval( + new ReservationInterval(1L, 7L)), Resource.newInstance(0, 0)); + } + + private void setupArrays( + TreeMap a, TreeMap b) { a.put(10L, Resource.newInstance(5, 5)); a.put(20L, Resource.newInstance(10, 10)); a.put(30L, Resource.newInstance(15, 15));