YARN-9474. Remove hard coded sleep from Opportunistic Scheduler tests. Contributed by Abhishek Modi.
This commit is contained in:
parent
4b4fef2f0e
commit
7a68e7abd5
@ -37,6 +37,8 @@ import org.apache.hadoop.yarn.util.resource.Resources;
|
|||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
@ -47,6 +49,8 @@ import java.util.Set;
|
|||||||
|
|
||||||
public class TestOpportunisticContainerAllocator {
|
public class TestOpportunisticContainerAllocator {
|
||||||
|
|
||||||
|
private static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(TestOpportunisticContainerAllocator.class);
|
||||||
private static final int GB = 1024;
|
private static final int GB = 1024;
|
||||||
private OpportunisticContainerAllocator allocator = null;
|
private OpportunisticContainerAllocator allocator = null;
|
||||||
private OpportunisticContainerContext oppCntxt = null;
|
private OpportunisticContainerContext oppCntxt = null;
|
||||||
@ -174,7 +178,7 @@ public class TestOpportunisticContainerAllocator {
|
|||||||
|
|
||||||
List<Container> containers = allocator.allocateContainers(
|
List<Container> containers = allocator.allocateContainers(
|
||||||
blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser");
|
blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser");
|
||||||
System.out.println(containers);
|
LOG.info("Containers: {}", containers);
|
||||||
Set<String> allocatedHosts = new HashSet<>();
|
Set<String> allocatedHosts = new HashSet<>();
|
||||||
for (Container c : containers) {
|
for (Container c : containers) {
|
||||||
allocatedHosts.add(c.getNodeHttpAddress());
|
allocatedHosts.add(c.getNodeHttpAddress());
|
||||||
@ -242,7 +246,7 @@ public class TestOpportunisticContainerAllocator {
|
|||||||
|
|
||||||
List<Container> containers = allocator.allocateContainers(
|
List<Container> containers = allocator.allocateContainers(
|
||||||
blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser");
|
blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser");
|
||||||
System.out.println(containers);
|
LOG.info("Containers: {}", containers);
|
||||||
Set<String> allocatedHosts = new HashSet<>();
|
Set<String> allocatedHosts = new HashSet<>();
|
||||||
for (Container c : containers) {
|
for (Container c : containers) {
|
||||||
allocatedHosts.add(c.getNodeHttpAddress());
|
allocatedHosts.add(c.getNodeHttpAddress());
|
||||||
@ -295,7 +299,7 @@ public class TestOpportunisticContainerAllocator {
|
|||||||
|
|
||||||
List<Container> containers = allocator.allocateContainers(
|
List<Container> containers = allocator.allocateContainers(
|
||||||
blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser");
|
blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser");
|
||||||
System.out.println(containers);
|
LOG.info("Containers: {}", containers);
|
||||||
Set<String> allocatedHosts = new HashSet<>();
|
Set<String> allocatedHosts = new HashSet<>();
|
||||||
for (Container c : containers) {
|
for (Container c : containers) {
|
||||||
allocatedHosts.add(c.getNodeHttpAddress());
|
allocatedHosts.add(c.getNodeHttpAddress());
|
||||||
@ -412,7 +416,7 @@ public class TestOpportunisticContainerAllocator {
|
|||||||
for (Container c : containers) {
|
for (Container c : containers) {
|
||||||
allocatedHosts.add(c.getNodeHttpAddress());
|
allocatedHosts.add(c.getNodeHttpAddress());
|
||||||
}
|
}
|
||||||
System.out.println(containers);
|
LOG.info("Containers: {}", containers);
|
||||||
Assert.assertTrue(allocatedHosts.contains("h2:1234"));
|
Assert.assertTrue(allocatedHosts.contains("h2:1234"));
|
||||||
Assert.assertTrue(allocatedHosts.contains("h5:1234"));
|
Assert.assertTrue(allocatedHosts.contains("h5:1234"));
|
||||||
Assert.assertFalse(allocatedHosts.contains("h3:1234"));
|
Assert.assertFalse(allocatedHosts.contains("h3:1234"));
|
||||||
@ -459,7 +463,7 @@ public class TestOpportunisticContainerAllocator {
|
|||||||
for (Container c : containers) {
|
for (Container c : containers) {
|
||||||
allocatedHosts.add(c.getNodeHttpAddress());
|
allocatedHosts.add(c.getNodeHttpAddress());
|
||||||
}
|
}
|
||||||
System.out.println(containers);
|
LOG.info("Containers: {}", containers);
|
||||||
Assert.assertTrue(allocatedHosts.contains("h2:1234"));
|
Assert.assertTrue(allocatedHosts.contains("h2:1234"));
|
||||||
Assert.assertTrue(allocatedHosts.contains("h5:1234"));
|
Assert.assertTrue(allocatedHosts.contains("h5:1234"));
|
||||||
Assert.assertFalse(allocatedHosts.contains("h3:1234"));
|
Assert.assertFalse(allocatedHosts.contains("h3:1234"));
|
||||||
@ -502,7 +506,7 @@ public class TestOpportunisticContainerAllocator {
|
|||||||
|
|
||||||
List<Container> containers = allocator.allocateContainers(
|
List<Container> containers = allocator.allocateContainers(
|
||||||
blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser");
|
blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser");
|
||||||
System.out.println(containers);
|
LOG.info("Containers: {}", containers);
|
||||||
Assert.assertEquals(2, containers.size());
|
Assert.assertEquals(2, containers.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -94,8 +94,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSc
|
|||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
|
||||||
.FifoScheduler;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||||
import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext;
|
import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext;
|
||||||
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
|
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
|
||||||
@ -106,17 +105,12 @@ import org.junit.Before;
|
|||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.mockito.Mockito;
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
import com.google.common.base.Supplier;
|
|
||||||
|
|
||||||
import static org.junit.Assert.fail;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.TimeoutException;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test cases for {@link OpportunisticContainerAllocatorAMService}.
|
* Test cases for {@link OpportunisticContainerAllocatorAMService}.
|
||||||
@ -223,7 +217,8 @@ public class TestOpportunisticContainerAllocatorAMService {
|
|||||||
nm3.nodeHeartbeat(true);
|
nm3.nodeHeartbeat(true);
|
||||||
nm4.nodeHeartbeat(true);
|
nm4.nodeHeartbeat(true);
|
||||||
|
|
||||||
Thread.sleep(1000);
|
GenericTestUtils.waitFor(() ->
|
||||||
|
amservice.getLeastLoadedNodes().size() == 4, 10, 10 * 100);
|
||||||
|
|
||||||
QueueMetrics metrics = ((CapacityScheduler) scheduler).getRootQueue()
|
QueueMetrics metrics = ((CapacityScheduler) scheduler).getRootQueue()
|
||||||
.getMetrics();
|
.getMetrics();
|
||||||
@ -388,7 +383,8 @@ public class TestOpportunisticContainerAllocatorAMService {
|
|||||||
nm1.nodeHeartbeat(true);
|
nm1.nodeHeartbeat(true);
|
||||||
nm2.nodeHeartbeat(true);
|
nm2.nodeHeartbeat(true);
|
||||||
|
|
||||||
Thread.sleep(1000);
|
GenericTestUtils.waitFor(() ->
|
||||||
|
amservice.getLeastLoadedNodes().size() == 2, 10, 10 * 100);
|
||||||
|
|
||||||
QueueMetrics metrics = ((CapacityScheduler) scheduler).getRootQueue()
|
QueueMetrics metrics = ((CapacityScheduler) scheduler).getRootQueue()
|
||||||
.getMetrics();
|
.getMetrics();
|
||||||
@ -513,7 +509,8 @@ public class TestOpportunisticContainerAllocatorAMService {
|
|||||||
nm1.nodeHeartbeat(true);
|
nm1.nodeHeartbeat(true);
|
||||||
nm2.nodeHeartbeat(true);
|
nm2.nodeHeartbeat(true);
|
||||||
|
|
||||||
Thread.sleep(1000);
|
GenericTestUtils.waitFor(() ->
|
||||||
|
amservice.getLeastLoadedNodes().size() == 2, 10, 10 * 100);
|
||||||
|
|
||||||
QueueMetrics metrics = ((CapacityScheduler) scheduler).getRootQueue()
|
QueueMetrics metrics = ((CapacityScheduler) scheduler).getRootQueue()
|
||||||
.getMetrics();
|
.getMetrics();
|
||||||
@ -618,7 +615,9 @@ public class TestOpportunisticContainerAllocatorAMService {
|
|||||||
amservice.handle(new NodeUpdateSchedulerEvent(rmNode1));
|
amservice.handle(new NodeUpdateSchedulerEvent(rmNode1));
|
||||||
|
|
||||||
nm1.nodeHeartbeat(true);
|
nm1.nodeHeartbeat(true);
|
||||||
Thread.sleep(1000);
|
|
||||||
|
GenericTestUtils.waitFor(() ->
|
||||||
|
amservice.getLeastLoadedNodes().size() == 1, 10, 10 * 100);
|
||||||
|
|
||||||
AllocateResponse allocateResponse = am1.allocate(Arrays.asList(
|
AllocateResponse allocateResponse = am1.allocate(Arrays.asList(
|
||||||
ResourceRequest.newInstance(Priority.newInstance(1), "*",
|
ResourceRequest.newInstance(Priority.newInstance(1), "*",
|
||||||
@ -801,6 +800,9 @@ public class TestOpportunisticContainerAllocatorAMService {
|
|||||||
nm1.nodeHeartbeat(true);
|
nm1.nodeHeartbeat(true);
|
||||||
nm2.nodeHeartbeat(true);
|
nm2.nodeHeartbeat(true);
|
||||||
|
|
||||||
|
GenericTestUtils.waitFor(() ->
|
||||||
|
amservice.getLeastLoadedNodes().size() == 2, 10, 10 * 100);
|
||||||
|
|
||||||
AllocateResponse allocateResponse = am1.allocate(Arrays.asList(
|
AllocateResponse allocateResponse = am1.allocate(Arrays.asList(
|
||||||
ResourceRequest.newInstance(Priority.newInstance(1), "*",
|
ResourceRequest.newInstance(Priority.newInstance(1), "*",
|
||||||
Resources.createResource(1 * GB), 2, true, null,
|
Resources.createResource(1 * GB), 2, true, null,
|
||||||
@ -964,15 +966,10 @@ public class TestOpportunisticContainerAllocatorAMService {
|
|||||||
// Send add and update node events to AM Service.
|
// Send add and update node events to AM Service.
|
||||||
amservice.handle(new NodeAddedSchedulerEvent(rmNode1));
|
amservice.handle(new NodeAddedSchedulerEvent(rmNode1));
|
||||||
amservice.handle(new NodeUpdateSchedulerEvent(rmNode1));
|
amservice.handle(new NodeUpdateSchedulerEvent(rmNode1));
|
||||||
try {
|
|
||||||
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
GenericTestUtils.waitFor(() ->
|
||||||
@Override public Boolean get() {
|
scheduler.getNumClusterNodes() == 1, 10, 200 * 100);
|
||||||
return scheduler.getNumClusterNodes() == 1;
|
|
||||||
}
|
|
||||||
}, 10, 200 * 100);
|
|
||||||
}catch (TimeoutException e) {
|
|
||||||
fail("timed out while waiting for NM to add.");
|
|
||||||
}
|
|
||||||
AllocateResponse allocateResponse = am.allocate(
|
AllocateResponse allocateResponse = am.allocate(
|
||||||
Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1),
|
Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1),
|
||||||
"*", Resources.createResource(1 * GB), 2, true, null,
|
"*", Resources.createResource(1 * GB), 2, true, null,
|
||||||
@ -983,15 +980,10 @@ public class TestOpportunisticContainerAllocatorAMService {
|
|||||||
.getAllocatedContainers();
|
.getAllocatedContainers();
|
||||||
Container container = allocatedContainers.get(0);
|
Container container = allocatedContainers.get(0);
|
||||||
scheduler.handle(new NodeRemovedSchedulerEvent(rmNode1));
|
scheduler.handle(new NodeRemovedSchedulerEvent(rmNode1));
|
||||||
try {
|
|
||||||
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
GenericTestUtils.waitFor(() ->
|
||||||
@Override public Boolean get() {
|
scheduler.getNumClusterNodes() == 0, 10, 200 * 100);
|
||||||
return scheduler.getNumClusterNodes() == 0;
|
|
||||||
}
|
|
||||||
}, 10, 200 * 100);
|
|
||||||
}catch (TimeoutException e) {
|
|
||||||
fail("timed out while waiting for NM to remove.");
|
|
||||||
}
|
|
||||||
//test YARN-9165
|
//test YARN-9165
|
||||||
RMContainer rmContainer = null;
|
RMContainer rmContainer = null;
|
||||||
rmContainer = SchedulerUtils.createOpportunisticRmContainer(
|
rmContainer = SchedulerUtils.createOpportunisticRmContainer(
|
||||||
@ -1002,7 +994,6 @@ public class TestOpportunisticContainerAllocatorAMService {
|
|||||||
schedulerAttempt.getApplicationAttemptId(), container.getNodeId(),
|
schedulerAttempt.getApplicationAttemptId(), container.getNodeId(),
|
||||||
schedulerAttempt.getUser(), rm.getRMContext(), true);
|
schedulerAttempt.getUser(), rm.getRMContext(), true);
|
||||||
}
|
}
|
||||||
assert(rmContainer!=null);
|
|
||||||
//test YARN-9164
|
//test YARN-9164
|
||||||
schedulerAttempt.addRMContainer(container.getId(), rmContainer);
|
schedulerAttempt.addRMContainer(container.getId(), rmContainer);
|
||||||
scheduler.handle(new AppAttemptRemovedSchedulerEvent(attemptId,
|
scheduler.handle(new AppAttemptRemovedSchedulerEvent(attemptId,
|
||||||
|
Loading…
x
Reference in New Issue
Block a user