YARN-5331. Extend RLESparseResourceAllocation with period for supporting recurring reservations in YARN ReservationSystem. (Sangeetha Abdu Jyothi via Subru).
This commit is contained in:
parent
e514fc432a
commit
6bf42e48ef
@ -0,0 +1,167 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* This data structure stores a periodic RLESparseResourceAllocation.
|
||||
* Default period is 1 day (86400000ms).
|
||||
*/
|
||||
public class PeriodicRLESparseResourceAllocation extends
|
||||
RLESparseResourceAllocation {
|
||||
|
||||
// Log
|
||||
private static final Logger LOG = LoggerFactory
|
||||
.getLogger(PeriodicRLESparseResourceAllocation.class);
|
||||
|
||||
private long timePeriod;
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
*
|
||||
* @param rleVector {@link RLESparseResourceAllocation} with the run-length
|
||||
encoded data.
|
||||
* @param timePeriod Time period in milliseconds.
|
||||
*/
|
||||
public PeriodicRLESparseResourceAllocation(
|
||||
RLESparseResourceAllocation rleVector, Long timePeriod) {
|
||||
super(rleVector.getCumulative(), rleVector.getResourceCalculator());
|
||||
this.timePeriod = timePeriod;
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructor. Default time period set to 1 day.
|
||||
*
|
||||
* @param rleVector {@link RLESparseResourceAllocation} with the run-length
|
||||
encoded data.
|
||||
*/
|
||||
public PeriodicRLESparseResourceAllocation(
|
||||
RLESparseResourceAllocation rleVector) {
|
||||
this(rleVector, 86400000L);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get capacity at time based on periodic repetition.
|
||||
*
|
||||
* @param tick UTC time for which the allocated {@link Resource} is queried.
|
||||
* @return {@link Resource} allocated at specified time
|
||||
*/
|
||||
public Resource getCapacityAtTime(long tick) {
|
||||
long convertedTime = (tick % timePeriod);
|
||||
return super.getCapacityAtTime(convertedTime);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add resource for the specified interval. This function will be used by
|
||||
* {@link InMemoryPlan} while placing reservations between 0 and timePeriod.
|
||||
* The interval may include 0, but the end time must be strictly less than
|
||||
* timePeriod.
|
||||
*
|
||||
* @param interval {@link ReservationInterval} to which the specified
|
||||
* resource is to be added.
|
||||
* @param resource {@link Resource} to be added to the interval specified.
|
||||
* @return true if addition is successful, false otherwise
|
||||
*/
|
||||
public boolean addInterval(ReservationInterval interval,
|
||||
Resource resource) {
|
||||
long startTime = interval.getStartTime();
|
||||
long endTime = interval.getEndTime();
|
||||
if (startTime >= 0 && endTime > startTime && endTime <= timePeriod) {
|
||||
return super.addInterval(interval, resource);
|
||||
} else {
|
||||
LOG.info("Cannot set capacity beyond end time: " + timePeriod);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes a resource for the specified interval.
|
||||
*
|
||||
* @param interval the {@link ReservationInterval} for which the resource is
|
||||
* to be removed.
|
||||
* @param resource the {@link Resource} to be removed.
|
||||
* @return true if removal is successful, false otherwise
|
||||
*/
|
||||
public boolean removeInterval(
|
||||
ReservationInterval interval, Resource resource) {
|
||||
long startTime = interval.getStartTime();
|
||||
long endTime = interval.getEndTime();
|
||||
// If the resource to be subtracted is less than the minimum resource in
|
||||
// the range, abort removal to avoid negative capacity.
|
||||
if (!Resources.fitsIn(
|
||||
resource, super.getMinimumCapacityInInterval(interval))) {
|
||||
LOG.info("Request to remove more resources than what is available");
|
||||
return false;
|
||||
}
|
||||
if (startTime >= 0 && endTime > startTime && endTime <= timePeriod) {
|
||||
return super.removeInterval(interval, resource);
|
||||
} else {
|
||||
LOG.info("Interval extends beyond the end time " + timePeriod);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get maximum capacity at periodic offsets from the specified time.
|
||||
*
|
||||
* @param tick UTC time base from which offsets are specified for finding
|
||||
* the maximum capacity.
|
||||
* @param period periodic offset at which capacities are evaluted.
|
||||
* @return the maximum {@link Resource} across the specified time instants.
|
||||
* @return true if removal is successful, false otherwise
|
||||
*/
|
||||
public Resource getMaximumPeriodicCapacity(long tick, long period) {
|
||||
Resource maxResource;
|
||||
if (period < timePeriod) {
|
||||
maxResource =
|
||||
super.getMaximumPeriodicCapacity(tick % timePeriod, period);
|
||||
} else {
|
||||
// if period is greater than the length of PeriodicRLESparseAllocation,
|
||||
// only a single value exists in this interval.
|
||||
maxResource = super.getCapacityAtTime(tick % timePeriod);
|
||||
}
|
||||
return maxResource;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get time period of PeriodicRLESparseResourceAllocation.
|
||||
*
|
||||
* @return timePeriod time period represented in ms.
|
||||
*/
|
||||
public long getTimePeriod() {
|
||||
return this.timePeriod;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder ret = new StringBuilder();
|
||||
ret.append("Period: ").append(timePeriod).append("\n")
|
||||
.append(super.toString());
|
||||
if (super.isEmpty()) {
|
||||
ret.append(" no allocations\n");
|
||||
}
|
||||
return ret.toString();
|
||||
}
|
||||
|
||||
}
|
@ -132,6 +132,7 @@ public boolean removeInterval(ReservationInterval reservationInterval,
|
||||
* Returns the capacity, i.e. total resources allocated at the specified point
|
||||
* of time.
|
||||
*
|
||||
* @param tick timeStap at which resource needs to be known
|
||||
* @return the resources allocated at the specified time
|
||||
*/
|
||||
public Resource getCapacityAtTime(long tick) {
|
||||
@ -309,6 +310,10 @@ public NavigableMap<Long, Resource> getCumulative() {
|
||||
}
|
||||
}
|
||||
|
||||
public ResourceCalculator getResourceCalculator() {
|
||||
return resourceCalculator;
|
||||
}
|
||||
|
||||
/**
|
||||
* Merges the range start to end of two {@code RLESparseResourceAllocation}
|
||||
* using a given {@code RLEOperator}.
|
||||
@ -533,4 +538,50 @@ public enum RLEOperator {
|
||||
add, subtract, min, max, subtractTestNonNegative
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the maximum capacity across specified time instances. The search-space
|
||||
* is specified using the starting value, tick, and the periodic interval for
|
||||
* search. Maximum resource allocation across tick, tick + period,
|
||||
* tick + 2 * period,..., tick + n * period .. is returned.
|
||||
*
|
||||
* @param tick the starting time instance
|
||||
* @param period interval at which capacity is evaluated
|
||||
* @return maximum resource allocation
|
||||
*/
|
||||
public Resource getMaximumPeriodicCapacity(long tick, long period) {
|
||||
Resource maxCapacity = ZERO_RESOURCE;
|
||||
if (!cumulativeCapacity.isEmpty()) {
|
||||
Long lastKey = cumulativeCapacity.lastKey();
|
||||
for (long t = tick; t <= lastKey; t = t + period) {
|
||||
maxCapacity = Resources.componentwiseMax(maxCapacity,
|
||||
cumulativeCapacity.floorEntry(t).getValue());
|
||||
}
|
||||
}
|
||||
return maxCapacity;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the minimum capacity in the specified time range.
|
||||
*
|
||||
* @param interval the {@link ReservationInterval} to be searched
|
||||
* @return minimum resource allocation
|
||||
*/
|
||||
public Resource getMinimumCapacityInInterval(ReservationInterval interval) {
|
||||
Resource minCapacity = Resource.newInstance(
|
||||
Integer.MAX_VALUE, Integer.MAX_VALUE);
|
||||
long start = interval.getStartTime();
|
||||
long end = interval.getEndTime();
|
||||
NavigableMap<Long, Resource> capacityRange =
|
||||
this.getRangeOverlapping(start, end).getCumulative();
|
||||
if (!capacityRange.isEmpty()) {
|
||||
for (Map.Entry<Long, Resource> entry : capacityRange.entrySet()) {
|
||||
if (entry.getValue() != null) {
|
||||
minCapacity = Resources.componentwiseMin(minCapacity,
|
||||
entry.getValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
return minCapacity;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -56,6 +56,7 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
||||
import org.junit.Assert;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
@ -414,6 +415,19 @@ public static Map<ReservationInterval, Resource> generateAllocation(
|
||||
return req;
|
||||
}
|
||||
|
||||
public static RLESparseResourceAllocation
|
||||
generateRLESparseResourceAllocation(int[] alloc, long[] timeSteps) {
|
||||
TreeMap<Long, Resource> allocationsMap = new TreeMap<>();
|
||||
for (int i = 0; i < alloc.length; i++) {
|
||||
allocationsMap.put(timeSteps[i],
|
||||
Resource.newInstance(alloc[i], alloc[i]));
|
||||
}
|
||||
RLESparseResourceAllocation rleVector =
|
||||
new RLESparseResourceAllocation(allocationsMap,
|
||||
new DefaultResourceCalculator());
|
||||
return rleVector;
|
||||
}
|
||||
|
||||
public static Resource calculateClusterResource(int numContainers) {
|
||||
return Resource.newInstance(numContainers * 1024, numContainers);
|
||||
}
|
||||
|
@ -0,0 +1,142 @@
|
||||
/******************************************************************************
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*****************************************************************************/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Testing the class PeriodicRLESparseResourceAllocation.
|
||||
*/
|
||||
public class TestPeriodicRLESparseResourceAllocation {
|
||||
|
||||
private static final Logger LOG = LoggerFactory
|
||||
.getLogger(TestPeriodicRLESparseResourceAllocation.class);
|
||||
|
||||
@Test
|
||||
public void testPeriodicCapacity() {
|
||||
int[] alloc = {10, 7, 5, 2, 0};
|
||||
long[] timeSteps = {0L, 5L, 10L, 15L, 19L};
|
||||
RLESparseResourceAllocation rleSparseVector =
|
||||
ReservationSystemTestUtil.generateRLESparseResourceAllocation(
|
||||
alloc, timeSteps);
|
||||
PeriodicRLESparseResourceAllocation periodicVector =
|
||||
new PeriodicRLESparseResourceAllocation(rleSparseVector, 20L);
|
||||
LOG.info(periodicVector.toString());
|
||||
Assert.assertEquals(Resource.newInstance(5, 5),
|
||||
periodicVector.getCapacityAtTime(10L));
|
||||
Assert.assertEquals(Resource.newInstance(10, 10),
|
||||
periodicVector.getCapacityAtTime(20L));
|
||||
Assert.assertEquals(Resource.newInstance(7, 7),
|
||||
periodicVector.getCapacityAtTime(27L));
|
||||
Assert.assertEquals(Resource.newInstance(5, 5),
|
||||
periodicVector.getCapacityAtTime(50L));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMaxPeriodicCapacity() {
|
||||
int[] alloc = {2, 5, 7, 10, 3, 4, 6, 8};
|
||||
long[] timeSteps = {0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L};
|
||||
RLESparseResourceAllocation rleSparseVector =
|
||||
ReservationSystemTestUtil.generateRLESparseResourceAllocation(
|
||||
alloc, timeSteps);
|
||||
PeriodicRLESparseResourceAllocation periodicVector =
|
||||
new PeriodicRLESparseResourceAllocation(rleSparseVector, 8L);
|
||||
LOG.info(periodicVector.toString());
|
||||
Assert.assertEquals(
|
||||
periodicVector.getMaximumPeriodicCapacity(0, 1),
|
||||
Resource.newInstance(10, 10));
|
||||
Assert.assertEquals(
|
||||
periodicVector.getMaximumPeriodicCapacity(8, 2),
|
||||
Resource.newInstance(7, 7));
|
||||
Assert.assertEquals(
|
||||
periodicVector.getMaximumPeriodicCapacity(16, 3),
|
||||
Resource.newInstance(10, 10));
|
||||
Assert.assertEquals(
|
||||
periodicVector.getMaximumPeriodicCapacity(17, 4),
|
||||
Resource.newInstance(5, 5));
|
||||
Assert.assertEquals(
|
||||
periodicVector.getMaximumPeriodicCapacity(32, 5),
|
||||
Resource.newInstance(4, 4));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSetCapacityInInterval() {
|
||||
int[] alloc = {2, 5, 0};
|
||||
long[] timeSteps = {1L, 2L, 3L};
|
||||
RLESparseResourceAllocation rleSparseVector =
|
||||
ReservationSystemTestUtil.generateRLESparseResourceAllocation(
|
||||
alloc, timeSteps);
|
||||
PeriodicRLESparseResourceAllocation periodicVector =
|
||||
new PeriodicRLESparseResourceAllocation(rleSparseVector, 10L);
|
||||
ReservationInterval interval = new ReservationInterval(5L, 10L);
|
||||
periodicVector.addInterval(
|
||||
interval, Resource.newInstance(8, 8));
|
||||
Assert.assertEquals(Resource.newInstance(8, 8),
|
||||
periodicVector.getCapacityAtTime(5L));
|
||||
Assert.assertEquals(Resource.newInstance(8, 8),
|
||||
periodicVector.getCapacityAtTime(9L));
|
||||
Assert.assertEquals(Resource.newInstance(0, 0),
|
||||
periodicVector.getCapacityAtTime(10L));
|
||||
Assert.assertEquals(Resource.newInstance(0, 0),
|
||||
periodicVector.getCapacityAtTime(0L));
|
||||
Assert.assertFalse(periodicVector.addInterval(
|
||||
new ReservationInterval(7L, 12L), Resource.newInstance(8, 8)));
|
||||
}
|
||||
|
||||
public void testRemoveInterval() {
|
||||
int[] alloc = {2, 5, 3, 4, 0};
|
||||
long[] timeSteps = {1L, 3L, 5L, 7L, 9L};
|
||||
RLESparseResourceAllocation rleSparseVector =
|
||||
ReservationSystemTestUtil.generateRLESparseResourceAllocation(
|
||||
alloc, timeSteps);
|
||||
PeriodicRLESparseResourceAllocation periodicVector =
|
||||
new PeriodicRLESparseResourceAllocation(rleSparseVector, 10L);
|
||||
ReservationInterval interval = new ReservationInterval(3L, 7L);
|
||||
Assert.assertTrue(periodicVector.removeInterval(
|
||||
interval, Resource.newInstance(3, 3)));
|
||||
Assert.assertEquals(Resource.newInstance(2, 2),
|
||||
periodicVector.getCapacityAtTime(1L));
|
||||
Assert.assertEquals(Resource.newInstance(2, 2),
|
||||
periodicVector.getCapacityAtTime(2L));
|
||||
Assert.assertEquals(Resource.newInstance(2, 2),
|
||||
periodicVector.getCapacityAtTime(3L));
|
||||
Assert.assertEquals(Resource.newInstance(2, 2),
|
||||
periodicVector.getCapacityAtTime(4L));
|
||||
Assert.assertEquals(Resource.newInstance(0, 0),
|
||||
periodicVector.getCapacityAtTime(5L));
|
||||
Assert.assertEquals(Resource.newInstance(0, 0),
|
||||
periodicVector.getCapacityAtTime(6L));
|
||||
Assert.assertEquals(Resource.newInstance(4, 4),
|
||||
periodicVector.getCapacityAtTime(7L));
|
||||
|
||||
// invalid interval
|
||||
Assert.assertFalse(periodicVector.removeInterval(
|
||||
new ReservationInterval(7L, 12L), Resource.newInstance(1, 1)));
|
||||
|
||||
// invalid capacity
|
||||
Assert.assertFalse(periodicVector.removeInterval(
|
||||
new ReservationInterval(2L, 4L), Resource.newInstance(8, 8)));
|
||||
|
||||
}
|
||||
|
||||
}
|
@ -524,7 +524,61 @@ public void testToIntervalMap() {
|
||||
}
|
||||
}
|
||||
|
||||
private void setupArrays(TreeMap<Long, Resource> a, TreeMap<Long, Resource> b) {
|
||||
@Test
|
||||
public void testMaxPeriodicCapacity() {
|
||||
long[] timeSteps = {0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L};
|
||||
int[] alloc = {2, 5, 7, 10, 3, 4, 6, 8};
|
||||
RLESparseResourceAllocation rleSparseVector =
|
||||
ReservationSystemTestUtil.generateRLESparseResourceAllocation(
|
||||
alloc, timeSteps);
|
||||
LOG.info(rleSparseVector.toString());
|
||||
Assert.assertEquals(
|
||||
rleSparseVector.getMaximumPeriodicCapacity(0, 1),
|
||||
Resource.newInstance(10, 10));
|
||||
Assert.assertEquals(
|
||||
rleSparseVector.getMaximumPeriodicCapacity(0, 2),
|
||||
Resource.newInstance(7, 7));
|
||||
Assert.assertEquals(
|
||||
rleSparseVector.getMaximumPeriodicCapacity(0, 3),
|
||||
Resource.newInstance(10, 10));
|
||||
Assert.assertEquals(
|
||||
rleSparseVector.getMaximumPeriodicCapacity(0, 4),
|
||||
Resource.newInstance(3, 3));
|
||||
Assert.assertEquals(
|
||||
rleSparseVector.getMaximumPeriodicCapacity(0, 5),
|
||||
Resource.newInstance(4, 4));
|
||||
Assert.assertEquals(
|
||||
rleSparseVector.getMaximumPeriodicCapacity(0, 5),
|
||||
Resource.newInstance(4, 4));
|
||||
Assert.assertEquals(
|
||||
rleSparseVector.getMaximumPeriodicCapacity(7, 5),
|
||||
Resource.newInstance(8, 8));
|
||||
Assert.assertEquals(
|
||||
rleSparseVector.getMaximumPeriodicCapacity(10, 3),
|
||||
Resource.newInstance(0, 0));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetMinimumCapacityInInterval() {
|
||||
long[] timeSteps = {0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L};
|
||||
int[] alloc = {2, 5, 7, 10, 3, 4, 0, 8};
|
||||
RLESparseResourceAllocation rleSparseVector =
|
||||
ReservationSystemTestUtil.generateRLESparseResourceAllocation(
|
||||
alloc, timeSteps);
|
||||
LOG.info(rleSparseVector.toString());
|
||||
Assert.assertEquals(
|
||||
rleSparseVector.getMinimumCapacityInInterval(
|
||||
new ReservationInterval(1L, 3L)), Resource.newInstance(5, 5));
|
||||
Assert.assertEquals(
|
||||
rleSparseVector.getMinimumCapacityInInterval(
|
||||
new ReservationInterval(2L, 5L)), Resource.newInstance(3, 3));
|
||||
Assert.assertEquals(
|
||||
rleSparseVector.getMinimumCapacityInInterval(
|
||||
new ReservationInterval(1L, 7L)), Resource.newInstance(0, 0));
|
||||
}
|
||||
|
||||
private void setupArrays(
|
||||
TreeMap<Long, Resource> a, TreeMap<Long, Resource> b) {
|
||||
a.put(10L, Resource.newInstance(5, 5));
|
||||
a.put(20L, Resource.newInstance(10, 10));
|
||||
a.put(30L, Resource.newInstance(15, 15));
|
||||
|
Loading…
Reference in New Issue
Block a user