diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 76cad7fc01..ebbf0f4f2e 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -1361,6 +1361,10 @@ Release 2.8.0 - UNRELEASED YARN-4643. Container recovery is broken with delegating container runtime (Sidharta Seethana via jlowe) + YARN-4617. LeafQueue#pendingOrderingPolicy should always use fixed ordering + policy instead of using same as active applications ordering policy. + (Rohith Sharma K S via jianhe) + Release 2.7.3 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml index c12377b36c..c640d9f79b 100644 --- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml +++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml @@ -162,6 +162,10 @@ + + + + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index ca05fe9f1f..0cbb88d0ef 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -968,7 +968,8 @@ protected void getActivedAppDiagnosticMessage( // queue's resource usage for specific partition } - public boolean isAttemptRecovering() { + @Override + public boolean isRecovering() { return isAttemptRecovering; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 56e450247d..c625fae12c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -66,6 +66,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicyForPendingApps; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy; import org.apache.hadoop.yarn.server.utils.Lock; import org.apache.hadoop.yarn.server.utils.Lock.NoLock; @@ -96,9 +97,6 @@ public class LeafQueue extends AbstractCSQueue { private Priority defaultAppPriorityPerQueue; private OrderingPolicy pendingOrderingPolicy = null; - - // Always give preference to this while activating the application attempts. - private OrderingPolicy pendingOPForRecoveredApps = null; private volatile float minimumAllocationFactor; @@ -126,6 +124,7 @@ public class LeafQueue extends AbstractCSQueue { private Map> ignorePartitionExclusivityRMContainers = new HashMap<>(); + @SuppressWarnings({ "unchecked", "rawtypes" }) public LeafQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { super(cs, queueName, parent, old); @@ -133,6 +132,9 @@ public LeafQueue(CapacitySchedulerContext cs, this.activeUsersManager = new ActiveUsersManager(metrics); + // One time initialization is enough since it is static ordering policy + this.pendingOrderingPolicy = new FifoOrderingPolicyForPendingApps(); + if(LOG.isDebugEnabled()) { LOG.debug("LeafQueue:" + " name=" + queueName + ", fullname=" + getQueuePath()); @@ -159,11 +161,7 @@ protected synchronized void setupQueueConfigs(Resource clusterResource) CapacitySchedulerConfiguration conf = csContext.getConfiguration(); setOrderingPolicy(conf.getOrderingPolicy(getQueuePath())); - setPendingAppsOrderingPolicy(conf - . getOrderingPolicy(getQueuePath())); - setPendingAppsOrderingPolicyRecovery(conf - . getOrderingPolicy(getQueuePath())); - + userLimit = conf.getUserLimit(getQueuePath()); userLimitFactor = conf.getUserLimitFactor(getQueuePath()); @@ -327,8 +325,7 @@ public synchronized int getNumApplications() { } public synchronized int getNumPendingApplications() { - return pendingOrderingPolicy.getNumSchedulableEntities() - + pendingOPForRecoveredApps.getNumSchedulableEntities(); + return pendingOrderingPolicy.getNumSchedulableEntities(); } public synchronized int getNumActiveApplications() { @@ -627,18 +624,9 @@ private synchronized void activateApplications() { calculateAndGetAMResourceLimitPerPartition(nodePartition); } - activateApplications(getPendingAppsOrderingPolicyRecovery() - .getAssignmentIterator(), userAmPartitionLimit); - - activateApplications( - getPendingAppsOrderingPolicy().getAssignmentIterator(), - userAmPartitionLimit); - } - - private synchronized void activateApplications( - Iterator fsApp, - Map userAmPartitionLimit) { - while (fsApp.hasNext()) { + for (Iterator fsApp = + getPendingAppsOrderingPolicy().getAssignmentIterator(); + fsApp.hasNext();) { FiCaSchedulerApp application = fsApp.next(); ApplicationId applicationId = application.getApplicationId(); @@ -740,11 +728,7 @@ private synchronized void addApplicationAttempt(FiCaSchedulerApp application, User user) { // Accept user.submitApplication(); - if (application.isAttemptRecovering()) { - getPendingAppsOrderingPolicyRecovery().addSchedulableEntity(application); - } else { - getPendingAppsOrderingPolicy().addSchedulableEntity(application); - } + getPendingAppsOrderingPolicy().addSchedulableEntity(application); applicationAttemptMap.put(application.getApplicationAttemptId(), application); // Activate applications @@ -784,11 +768,7 @@ public synchronized void removeApplicationAttempt( boolean wasActive = orderingPolicy.removeSchedulableEntity(application); if (!wasActive) { - if (application.isAttemptRecovering()) { - pendingOPForRecoveredApps.removeSchedulableEntity(application); - } else { - pendingOrderingPolicy.removeSchedulableEntity(application); - } + pendingOrderingPolicy.removeSchedulableEntity(application); } else { queueUsage.decAMUsed(partitionName, application.getAMResource(partitionName)); @@ -1539,18 +1519,16 @@ public void recoverContainer(Resource clusterResource, * Obtain (read-only) collection of pending applications. */ public Collection getPendingApplications() { - Collection pendingApps = - new ArrayList(); - pendingApps.addAll(pendingOPForRecoveredApps.getSchedulableEntities()); - pendingApps.addAll(pendingOrderingPolicy.getSchedulableEntities()); - return pendingApps; + return Collections.unmodifiableCollection(pendingOrderingPolicy + .getSchedulableEntities()); } /** * Obtain (read-only) collection of active applications. */ public Collection getApplications() { - return orderingPolicy.getSchedulableEntities(); + return Collections.unmodifiableCollection(orderingPolicy + .getSchedulableEntities()); } // Consider the headroom for each user in the queue. @@ -1587,10 +1565,6 @@ public synchronized Resource getTotalPendingResourcesConsideringUserLimit( @Override public synchronized void collectSchedulerApplications( Collection apps) { - for (FiCaSchedulerApp pendingApp : pendingOPForRecoveredApps - .getSchedulableEntities()) { - apps.add(pendingApp.getApplicationAttemptId()); - } for (FiCaSchedulerApp pendingApp : pendingOrderingPolicy .getSchedulableEntities()) { apps.add(pendingApp.getApplicationAttemptId()); @@ -1759,30 +1733,6 @@ public void decreaseContainer(Resource clusterResource, getPendingAppsOrderingPolicy() { return pendingOrderingPolicy; } - public synchronized void setPendingAppsOrderingPolicy( - OrderingPolicy pendingOrderingPolicy) { - if (null != this.pendingOrderingPolicy) { - pendingOrderingPolicy - .addAllSchedulableEntities(this.pendingOrderingPolicy - .getSchedulableEntities()); - } - this.pendingOrderingPolicy = pendingOrderingPolicy; - } - - public synchronized OrderingPolicy - getPendingAppsOrderingPolicyRecovery() { - return pendingOPForRecoveredApps; - } - - public synchronized void setPendingAppsOrderingPolicyRecovery( - OrderingPolicy pendingOrderingPolicyRecovery) { - if (null != this.pendingOPForRecoveredApps) { - pendingOrderingPolicyRecovery - .addAllSchedulableEntities(this.pendingOPForRecoveredApps - .getSchedulableEntities()); - } - this.pendingOPForRecoveredApps = pendingOrderingPolicyRecovery; - } /* * Holds shared values used by all applications in diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicyForPendingApps.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicyForPendingApps.java new file mode 100644 index 0000000000..0891289394 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicyForPendingApps.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy; + +import java.util.*; + +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; + +/** + * This ordering policy is used for pending applications only. + * An OrderingPolicy which orders SchedulableEntities by + *
    + *
  • Recovering application + *
  • Priority of an application + *
  • Input order + *
+ *

+ * Example : If schedulableEntities with E1(true,1,1) E2(true,2,2) E3(true,3,3) + * E4(false,4,4) E5(false,4,5) are added. The ordering policy assignment + * iterator is in the order of E3(true,3,3) E2(true,2,2) E1(true,1,1) + * E5(false,5,5) E4(false,4,4) + */ +public class FifoOrderingPolicyForPendingApps + extends AbstractComparatorOrderingPolicy { + + public FifoOrderingPolicyForPendingApps() { + List> comparators = + new ArrayList>(); + comparators.add(new RecoveryComparator()); + comparators.add(new PriorityComparator()); + comparators.add(new FifoComparator()); + this.comparator = new CompoundComparator(comparators); + this.schedulableEntities = new TreeSet(comparator); + } + + @Override + public String getInfo() { + return "FifoOrderingPolicyForPendingApps"; + } + + @Override + public void configure(Map conf) { + } + + @Override + public void containerAllocated(S schedulableEntity, RMContainer r) { + } + + @Override + public void containerReleased(S schedulableEntity, RMContainer r) { + } + + @Override + public void demandUpdated(S schedulableEntity) { + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/RecoveryComparator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/RecoveryComparator.java new file mode 100644 index 0000000000..87f07e74f6 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/RecoveryComparator.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy; + +import java.util.Comparator; + +/** + * A Comparator which orders SchedulableEntities by isRecovering flag. + */ +public class RecoveryComparator implements Comparator { + @Override + public int compare(SchedulableEntity se1, SchedulableEntity se2) { + int val1 = se1.isRecovering() ? 1 : 0; + int val2 = se2.isRecovering() ? 1 : 0; + return val2 - val1; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/SchedulableEntity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/SchedulableEntity.java index 2ccb1cd3b1..41b83ce716 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/SchedulableEntity.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/SchedulableEntity.java @@ -18,15 +18,12 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy; -import java.util.*; - import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; /** - * A SchedulableEntity is a process to be scheduled, + * A SchedulableEntity is a process to be scheduled. * for example, an application / application attempt */ public interface SchedulableEntity { @@ -53,4 +50,9 @@ public interface SchedulableEntity { */ public Priority getPriority(); + /** + * Whether application was running before RM restart. + */ + public boolean isRecovering(); + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index 1922a35d34..42dcd6de5e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -2399,7 +2399,6 @@ public void testFifoAssignment() throws Exception { LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A)); a.setOrderingPolicy(new FifoOrderingPolicy()); - a.setPendingAppsOrderingPolicy(new FifoOrderingPolicy()); String host_0_0 = "127.0.0.1"; String rack_0 = "rack_0"; @@ -2549,7 +2548,6 @@ public void testFairAssignment() throws Exception { new FairOrderingPolicy(); a.setOrderingPolicy(schedulingOrder); - a.setPendingAppsOrderingPolicy(new FairOrderingPolicy()); String host_0_0 = "127.0.0.1"; String rack_0 = "rack_0"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/MockSchedulableEntity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/MockSchedulableEntity.java index bf4c98a554..4f251bf4e3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/MockSchedulableEntity.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/MockSchedulableEntity.java @@ -32,9 +32,17 @@ public class MockSchedulableEntity implements SchedulableEntity { private String id; private long serial = 0; private Priority priority; + private boolean isRecovering; public MockSchedulableEntity() { } + public MockSchedulableEntity(long serial, int priority, + boolean isRecovering) { + this.serial = serial; + this.priority = Priority.newInstance(priority); + this.isRecovering = isRecovering; + } + public void setId(String id) { this.id = id; } @@ -84,4 +92,13 @@ public Priority getPriority() { public void setApplicationPriority(Priority priority) { this.priority = priority; } + + @Override + public boolean isRecovering() { + return isRecovering; + } + + protected void setRecovering(boolean entityRecovering) { + this.isRecovering = entityRecovering; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFifoOrderingPolicyForPendingApps.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFifoOrderingPolicyForPendingApps.java new file mode 100644 index 0000000000..befa8e6c32 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFifoOrderingPolicyForPendingApps.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy; + +import java.util.*; + +import org.junit.Assert; +import org.junit.Test; + +public class TestFifoOrderingPolicyForPendingApps { + + @Test + public void testFifoOrderingPolicyForPendingApps() { + FifoOrderingPolicyForPendingApps policy = + new FifoOrderingPolicyForPendingApps(); + + MockSchedulableEntity r1 = new MockSchedulableEntity(); + MockSchedulableEntity r2 = new MockSchedulableEntity(); + + Assert.assertEquals(policy.getComparator().compare(r1, r2), 0); + + r1.setSerial(1); + r1.setRecovering(true); + Assert.assertEquals(policy.getComparator().compare(r1, r2), -1); + + r1.setRecovering(false); + r2.setSerial(2); + r2.setRecovering(true); + Assert.assertEquals(policy.getComparator().compare(r1, r2), 1); + } + + /** + * Entities submitted with E1-Recovering, E2-Recovering, E3-Recovering, E4-not + * recovering, E5-not recovering. + * Expected Iterator Output : E-3 E-2 E-1 E-5 E-4 + */ + @Test + public void testIterators() { + OrderingPolicy schedOrder = + new FifoOrderingPolicyForPendingApps(); + + MockSchedulableEntity msp1 = new MockSchedulableEntity(1, 1, true); + MockSchedulableEntity msp2 = new MockSchedulableEntity(2, 2, true); + MockSchedulableEntity msp3 = new MockSchedulableEntity(3, 3, true); + MockSchedulableEntity msp4 = new MockSchedulableEntity(4, 2, true); + MockSchedulableEntity msp5 = new MockSchedulableEntity(5, 5, false); + MockSchedulableEntity msp6 = new MockSchedulableEntity(6, 6, false); + MockSchedulableEntity msp7 = new MockSchedulableEntity(7, 5, false); + + schedOrder.addSchedulableEntity(msp1); + schedOrder.addSchedulableEntity(msp2); + schedOrder.addSchedulableEntity(msp3); + schedOrder.addSchedulableEntity(msp4); + schedOrder.addSchedulableEntity(msp5); + schedOrder.addSchedulableEntity(msp6); + schedOrder.addSchedulableEntity(msp7); + + // Assignment with serial id's are 3,2,4,1,6,5,7 + checkSerials(schedOrder.getAssignmentIterator(), new long[] { 3, 2, 4, 1, + 6, 5, 7 }); + + //Preemption, youngest to oldest + checkSerials(schedOrder.getPreemptionIterator(), new long[] { 7, 5, 6, 1, + 4, 2, 3 }); + } + + public void checkSerials(Iterator si, + long[] serials) { + for (int i = 0; i < serials.length; i++) { + Assert.assertEquals(si.next().getSerial(), serials[i]); + } + } +}