diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java index 07bee99820..bb1a4e8278 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java @@ -1,26 +1,21 @@ /******************************************************************************* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. *******************************************************************************/ package org.apache.hadoop.yarn.server.resourcemanager.reservation; -import java.util.Date; -import java.util.NavigableMap; -import java.util.TreeMap; - import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.ReservationId; @@ -28,9 +23,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningQuotaException; -import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException; import org.apache.hadoop.yarn.util.resource.Resources; +import java.util.Map; +import java.util.NavigableMap; +import java.util.TreeMap; + /** * This policy enforces a time-extended notion of Capacity. In particular it * guarantees that the allocation received in input when combined with all @@ -39,11 +37,11 @@ * validWindow, the integral of the allocations for a user (sum of the currently * submitted allocation and all prior allocations for the user) does not exceed * validWindow * maxAvg. - * + * * This allows flexibility, in the sense that an allocation can instantaneously * use large portions of the available capacity, but prevents abuses by bounding * the average use over time. - * + * * By controlling maxInst, maxAvg, validWindow the administrator configuring * this policy can obtain a behavior ranging from instantaneously enforced * capacity (akin to existing queues), or fully flexible allocations (likely @@ -51,7 +49,7 @@ */ @LimitedPrivate("yarn") @Unstable -public class CapacityOverTimePolicy implements SharingPolicy { +public class CapacityOverTimePolicy extends NoOverCommitPolicy { private ReservationSchedulerConfiguration conf; private long validWindow; @@ -68,121 +66,153 @@ public void init(String reservationQueuePath, validWindow = this.conf.getReservationWindow(reservationQueuePath); maxInst = this.conf.getInstantaneousMaxCapacity(reservationQueuePath) / 100; maxAvg = this.conf.getAverageCapacity(reservationQueuePath) / 100; - }; + } + /** + * The validation algorithm walks over the RLE encoded allocation and + * checks that for all transition points (when the start or end of the + * checking window encounters a value in the RLE). At this point it + * checkes whether the integral computed exceeds the quota limit. Note that + * this might not find the exact time of a violation, but if a violation + * exists it will find it. The advantage is a much lower number of checks + * as compared to time-slot by time-slot checks. + * + * @param plan the plan to validate against + * @param reservation the reservation allocation to test. + * @throws PlanningException if the validation fails. + */ @Override public void validate(Plan plan, ReservationAllocation reservation) throws PlanningException { - // this is entire method invoked under a write-lock on the plan, no need - // to synchronize accesses to the plan further - // Try to verify whether there is already a reservation with this ID in - // the system (remove its contribution during validation to simulate a - // try-n-swap - // update). - ReservationAllocation oldReservation = + // rely on NoOverCommitPolicy to check for: 1) user-match, 2) physical + // cluster limits, and 3) maxInst (via override of available) + try { + super.validate(plan, reservation); + } catch (PlanningException p) { + //wrap it in proper quota exception + throw new PlanningQuotaException(p); + } + + //---- check for integral violations of capacity -------- + + // Gather a view of what to check (curr allocation of user, minus old + // version of this reservation, plus new version) + RLESparseResourceAllocation consumptionForUserOverTime = + plan.getConsumptionForUserOverTime(reservation.getUser(), + reservation.getStartTime() - validWindow, + reservation.getEndTime() + validWindow); + + ReservationAllocation old = plan.getReservationById(reservation.getReservationId()); + if (old != null) { + consumptionForUserOverTime = RLESparseResourceAllocation + .merge(plan.getResourceCalculator(), plan.getTotalCapacity(), + consumptionForUserOverTime, old.getResourcesOverTime(), + RLEOperator.add, reservation.getStartTime() - validWindow, + reservation.getEndTime() + validWindow); + } - long startTime = reservation.getStartTime(); - long endTime = reservation.getEndTime(); - long step = plan.getStep(); + RLESparseResourceAllocation resRLE = reservation.getResourcesOverTime(); - Resource planTotalCapacity = plan.getTotalCapacity(); + RLESparseResourceAllocation toCheck = RLESparseResourceAllocation + .merge(plan.getResourceCalculator(), plan.getTotalCapacity(), + consumptionForUserOverTime, resRLE, RLEOperator.add, Long.MIN_VALUE, + Long.MAX_VALUE); - Resource maxAvgRes = Resources.multiply(planTotalCapacity, maxAvg); - Resource maxInsRes = Resources.multiply(planTotalCapacity, maxInst); + NavigableMap integralUp = new TreeMap<>(); + NavigableMap integralDown = new TreeMap<>(); - // define variable that will store integral of resources (need diff class to - // avoid overflow issues for long/large allocations) + long prevTime = toCheck.getEarliestStartTime(); + IntegralResource prevResource = new IntegralResource(0L, 0L); IntegralResource runningTot = new IntegralResource(0L, 0L); - IntegralResource maxAllowed = new IntegralResource(maxAvgRes); - maxAllowed.multiplyBy(validWindow / step); - RLESparseResourceAllocation userCons = - plan.getConsumptionForUserOverTime(reservation.getUser(), startTime - - validWindow, endTime + validWindow); + // add intermediate points + Map temp = new TreeMap<>(); + for (Map.Entry pointToCheck : toCheck.getCumulative() + .entrySet()) { - // check that the resources offered to the user during any window of length - // "validWindow" overlapping this allocation are within maxAllowed - // also enforce instantaneous and physical constraints during this pass - for (long t = startTime - validWindow; t < endTime + validWindow; t += step) { + Long timeToCheck = pointToCheck.getKey(); + Resource resourceToCheck = pointToCheck.getValue(); - Resource currExistingAllocTot = plan.getTotalCommittedResources(t); - Resource currExistingAllocForUser = userCons.getCapacityAtTime(t); - Resource currNewAlloc = reservation.getResourcesAtTime(t); - Resource currOldAlloc = Resources.none(); - if (oldReservation != null) { - currOldAlloc = oldReservation.getResourcesAtTime(t); + Long nextPoint = toCheck.getCumulative().higherKey(timeToCheck); + if (nextPoint == null || toCheck.getCumulative().get(nextPoint) == null) { + continue; } - - // throw exception if the cluster is overcommitted - // tot_allocated - old + new > capacity - Resource inst = - Resources.subtract(Resources.add(currExistingAllocTot, currNewAlloc), - currOldAlloc); - if (Resources.greaterThan(plan.getResourceCalculator(), - planTotalCapacity, inst, planTotalCapacity)) { - throw new ResourceOverCommitException(" Resources at time " + t - + " would be overcommitted (" + inst + " over " - + plan.getTotalCapacity() + ") by accepting reservation: " - + reservation.getReservationId()); - } - - // throw exception if instantaneous limits are violated - // tot_alloc_to_this_user - old + new > inst_limit - if (Resources.greaterThan(plan.getResourceCalculator(), - planTotalCapacity, Resources.subtract( - Resources.add(currExistingAllocForUser, currNewAlloc), - currOldAlloc), maxInsRes)) { - throw new PlanningQuotaException("Instantaneous quota capacity " - + maxInst + " would be passed at time " + t - + " by accepting reservation: " + reservation.getReservationId()); - } - - // throw exception if the running integral of utilization over validWindow - // is violated. We perform a delta check, adding/removing instants at the - // boundary of the window from runningTot. - - // runningTot = previous_runningTot + currExistingAllocForUser + - // currNewAlloc - currOldAlloc - pastNewAlloc - pastOldAlloc; - - // Where: - // 1) currNewAlloc, currExistingAllocForUser represent the contribution of - // the instant in time added in this pass. - // 2) pastNewAlloc, pastOldAlloc are the contributions relative to time - // instants that are being retired from the the window - // 3) currOldAlloc is the contribution (if any) of the previous version of - // this reservation (the one we are updating) - - runningTot.add(currExistingAllocForUser); - runningTot.add(currNewAlloc); - runningTot.subtract(currOldAlloc); - - // expire contributions from instant in time before (t - validWindow) - if (t > startTime) { - Resource pastOldAlloc = userCons.getCapacityAtTime(t - validWindow); - Resource pastNewAlloc = reservation.getResourcesAtTime(t - validWindow); - - // runningTot = runningTot - pastExistingAlloc - pastNewAlloc; - runningTot.subtract(pastOldAlloc); - runningTot.subtract(pastNewAlloc); - } - - // check integral - // runningTot > maxAvg * validWindow - // NOTE: we need to use comparator of IntegralResource directly, as - // Resource and ResourceCalculator assume "int" amount of resources, - // which is not sufficient when comparing integrals (out-of-bound) - if (maxAllowed.compareTo(runningTot) < 0) { - throw new PlanningQuotaException( - "Integral (avg over time) quota capacity " + maxAvg - + " over a window of " + validWindow / 1000 + " seconds, " - + " would be passed at time " + t + "(" + new Date(t) - + ") by accepting reservation: " - + reservation.getReservationId()); + for (int i = 1; i <= (nextPoint - timeToCheck) / validWindow; i++) { + temp.put(timeToCheck + (i * validWindow), resourceToCheck); } } + temp.putAll(toCheck.getCumulative()); + + // compute point-wise integral for the up-fronts and down-fronts + for (Map.Entry currPoint : temp.entrySet()) { + + Long currTime = currPoint.getKey(); + Resource currResource = currPoint.getValue(); + + //add to running total current contribution + prevResource.multiplyBy(currTime - prevTime); + runningTot.add(prevResource); + integralUp.put(currTime, normalizeToResource(runningTot, validWindow)); + integralDown.put(currTime + validWindow, + normalizeToResource(runningTot, validWindow)); + + if (currResource != null) { + prevResource.memory = currResource.getMemorySize(); + prevResource.vcores = currResource.getVirtualCores(); + } else { + prevResource.memory = 0L; + prevResource.vcores = 0L; + } + prevTime = currTime; + } + + // compute final integral as delta of up minus down transitions + RLESparseResourceAllocation intUp = + new RLESparseResourceAllocation(integralUp, + plan.getResourceCalculator()); + RLESparseResourceAllocation intDown = + new RLESparseResourceAllocation(integralDown, + plan.getResourceCalculator()); + + RLESparseResourceAllocation integral = RLESparseResourceAllocation + .merge(plan.getResourceCalculator(), plan.getTotalCapacity(), intUp, + intDown, RLEOperator.subtract, Long.MIN_VALUE, Long.MAX_VALUE); + + // define over-time integral limit + // note: this is aligned with the normalization done above + NavigableMap tlimit = new TreeMap<>(); + Resource maxAvgRes = Resources.multiply(plan.getTotalCapacity(), maxAvg); + tlimit.put(toCheck.getEarliestStartTime() - validWindow, maxAvgRes); + RLESparseResourceAllocation targetLimit = + new RLESparseResourceAllocation(tlimit, plan.getResourceCalculator()); + + // compare using merge() limit with integral + try { + RLESparseResourceAllocation + .merge(plan.getResourceCalculator(), plan.getTotalCapacity(), + targetLimit, integral, RLEOperator.subtractTestNonNegative, + reservation.getStartTime() - validWindow, + reservation.getEndTime() + validWindow); + } catch (PlanningException p) { + throw new PlanningQuotaException( + "Integral (avg over time) quota capacity " + maxAvg + + " over a window of " + validWindow / 1000 + " seconds, " + + " would be exceeded by accepting reservation: " + reservation + .getReservationId(), p); + } + } + + private Resource normalizeToResource(IntegralResource runningTot, + long window) { + // normalize to fit in windows. Rounding should not impact more than + // sub 1 core average allocations. This will all be removed once + // Resource moves to long. + int memory = (int) Math.round((double) runningTot.memory / window); + int vcores = (int) Math.round((double) runningTot.vcores / window); + return Resource.newInstance(memory, vcores); } @Override @@ -208,21 +238,18 @@ public RLESparseResourceAllocation availableResources( // add back in old reservation used resources if any ReservationAllocation old = plan.getReservationById(oldId); if (old != null) { - used = - RLESparseResourceAllocation.merge(plan.getResourceCalculator(), - Resources.clone(plan.getTotalCapacity()), used, - old.getResourcesOverTime(), RLEOperator.subtract, start, end); + used = RLESparseResourceAllocation.merge(plan.getResourceCalculator(), + Resources.clone(plan.getTotalCapacity()), used, + old.getResourcesOverTime(), RLEOperator.subtract, start, end); } - instRLEQuota = - RLESparseResourceAllocation.merge(plan.getResourceCalculator(), - planTotalCapacity, instRLEQuota, used, RLEOperator.subtract, start, - end); + instRLEQuota = RLESparseResourceAllocation + .merge(plan.getResourceCalculator(), planTotalCapacity, instRLEQuota, + used, RLEOperator.subtract, start, end); - instRLEQuota = - RLESparseResourceAllocation.merge(plan.getResourceCalculator(), - planTotalCapacity, available, instRLEQuota, RLEOperator.min, start, - end); + instRLEQuota = RLESparseResourceAllocation + .merge(plan.getResourceCalculator(), planTotalCapacity, available, + instRLEQuota, RLEOperator.min, start, end); return instRLEQuota; } @@ -260,11 +287,20 @@ public void add(Resource r) { vcores += r.getVirtualCores(); } + public void add(IntegralResource r) { + memory += r.memory; + vcores += r.vcores; + } + public void subtract(Resource r) { memory -= r.getMemorySize(); vcores -= r.getVirtualCores(); } + public IntegralResource negate() { + return new IntegralResource(-memory, -vcores); + } + public void multiplyBy(long window) { memory = memory * window; vcores = vcores * window; @@ -282,8 +318,7 @@ public long compareTo(IntegralResource other) { public String toString() { return ""; } + } - - } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java index 4aed064eb7..2dee60c2f6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java @@ -30,7 +30,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningQuotaException; -import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; @@ -117,6 +116,23 @@ public void testSimplePass() throws IOException, PlanningException { res, minAlloc), false)); } + @Test(expected = PlanningException.class) + public void testAllocationLargerThanValidWindow() throws IOException, + PlanningException { + // generate allocation that exceed the validWindow + int[] f = generateData(25*3600, (int) Math.ceil(0.69 * totCont)); + + ReservationDefinition rDef = + ReservationSystemTestUtil.createSimpleReservationDefinition( + initTime, initTime + f.length + 1, f.length); + assertTrue(plan.toString(), + plan.addReservation(new InMemoryReservationAllocation( + ReservationSystemTestUtil.getNewReservationId(), rDef, "u1", + "dedicated", initTime, initTime + f.length, + ReservationSystemTestUtil.generateAllocation(initTime, step, f), + res, minAlloc), false)); + } + @Test public void testSimplePass2() throws IOException, PlanningException { // generate allocation from single tenant that exceed avg momentarily but @@ -151,7 +167,7 @@ public void testMultiTenantPass() throws IOException, PlanningException { } } - @Test(expected = ResourceOverCommitException.class) + @Test(expected = PlanningQuotaException.class) public void testMultiTenantFail() throws IOException, PlanningException { // generate allocation from multiple tenants that exceed tot capacity int[] f = generateData(3600, (int) Math.ceil(0.25 * totCont));