YARN-7614. [RESERVATION] Support ListReservation APIs in Federation Router. (#4843)

This commit is contained in:
slfan1989 2022-09-13 03:33:21 +08:00 committed by GitHub
parent 65a027b112
commit 3ce353395b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 298 additions and 3 deletions

View File

@ -71,7 +71,7 @@ public final class RouterMetrics {
private MutableGaugeInt numGetContainerReportFailedRetrieved;
@Metric("# of getContainers failed to be retrieved")
private MutableGaugeInt numGetContainersFailedRetrieved;
@Metric("# of getContainers failed to be retrieved")
@Metric("# of listReservations failed to be retrieved")
private MutableGaugeInt numListReservationsFailedRetrieved;
@Metric("# of getResourceTypeInfo failed to be retrieved")
private MutableGaugeInt numGetResourceTypeInfo;
@ -105,6 +105,8 @@ public final class RouterMetrics {
private MutableGaugeInt numUpdateReservationFailedRetrieved;
@Metric("# of deleteReservation failed to be retrieved")
private MutableGaugeInt numDeleteReservationFailedRetrieved;
@Metric("# of listReservation failed to be retrieved")
private MutableGaugeInt numListReservationFailedRetrieved;
// Aggregate metrics are shared, and don't have to be looked up per call
@Metric("Total number of successful Submitted apps and latency(ms)")
@ -171,6 +173,8 @@ public final class RouterMetrics {
private MutableRate totalSucceededUpdateReservationRetrieved;
@Metric("Total number of successful Retrieved DeleteReservation and latency(ms)")
private MutableRate totalSucceededDeleteReservationRetrieved;
@Metric("Total number of successful Retrieved ListReservation and latency(ms)")
private MutableRate totalSucceededListReservationRetrieved;
/**
* Provide quantile counters for all latencies.
@ -207,6 +211,7 @@ public final class RouterMetrics {
private MutableQuantiles submitReservationLatency;
private MutableQuantiles updateReservationLatency;
private MutableQuantiles deleteReservationLatency;
private MutableQuantiles listReservationLatency;
private static volatile RouterMetrics instance = null;
private static MetricsRegistry registry;
@ -333,6 +338,10 @@ private RouterMetrics() {
deleteReservationLatency =
registry.newQuantiles("deleteReservationLatency",
"latency of delete reservation timeouts", "ops", "latency", 10);
listReservationLatency =
registry.newQuantiles("listReservationLatency",
"latency of list reservation timeouts", "ops", "latency", 10);
}
public static RouterMetrics getMetrics() {
@ -514,6 +523,11 @@ public long getNumSucceededDeleteReservationRetrieved() {
return totalSucceededDeleteReservationRetrieved.lastStat().numSamples();
}
@VisibleForTesting
public long getNumSucceededListReservationRetrieved() {
return totalSucceededListReservationRetrieved.lastStat().numSamples();
}
@VisibleForTesting
public double getLatencySucceededAppsCreated() {
return totalSucceededAppsCreated.lastStat().mean();
@ -674,6 +688,11 @@ public double getLatencySucceededDeleteReservationRetrieved() {
return totalSucceededDeleteReservationRetrieved.lastStat().mean();
}
@VisibleForTesting
public double getLatencySucceededListReservationRetrieved() {
return totalSucceededListReservationRetrieved.lastStat().mean();
}
@VisibleForTesting
public int getAppsFailedCreated() {
return numAppsFailedCreated.value();
@ -823,6 +842,10 @@ public int getDeleteReservationFailedRetrieved() {
return numDeleteReservationFailedRetrieved.value();
}
public int getListReservationFailedRetrieved() {
return numListReservationFailedRetrieved.value();
}
public void succeededAppsCreated(long duration) {
totalSucceededAppsCreated.add(duration);
getNewApplicationLatency.add(duration);
@ -983,6 +1006,11 @@ public void succeededDeleteReservationRetrieved(long duration) {
deleteReservationLatency.add(duration);
}
public void succeededListReservationRetrieved(long duration) {
totalSucceededListReservationRetrieved.add(duration);
listReservationLatency.add(duration);
}
public void incrAppsFailedCreated() {
numAppsFailedCreated.incr();
}
@ -1110,4 +1138,8 @@ public void incrUpdateReservationFailedRetrieved() {
public void incrDeleteReservationFailedRetrieved() {
numDeleteReservationFailedRetrieved.incr();
}
public void incrListReservationFailedRetrieved() {
numListReservationFailedRetrieved.incr();
}
}

View File

@ -53,6 +53,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@ -1483,7 +1484,34 @@ public Response deleteReservation(ReservationDeleteRequestInfo resContext,
public Response listReservation(String queue, String reservationId,
long startTime, long endTime, boolean includeResourceAllocations,
HttpServletRequest hsr) throws Exception {
throw new NotImplementedException("Code is not implemented");
if (queue == null || queue.isEmpty()) {
routerMetrics.incrListReservationFailedRetrieved();
throw new IllegalArgumentException("Parameter error, the queue is empty or null.");
}
if (reservationId == null || reservationId.isEmpty()) {
routerMetrics.incrListReservationFailedRetrieved();
throw new IllegalArgumentException("Parameter error, the reservationId is empty or null.");
}
try {
SubClusterInfo subClusterInfo = getHomeSubClusterInfoByReservationId(reservationId);
DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
HttpServletRequest hsrCopy = clone(hsr);
Response response = interceptor.listReservation(queue, reservationId, startTime, endTime,
includeResourceAllocations, hsrCopy);
if (response != null) {
return response;
}
} catch (YarnException e) {
routerMetrics.incrListReservationFailedRetrieved();
RouterServerUtil.logAndThrowRunTimeException("listReservation Failed.", e);
}
routerMetrics.incrListReservationFailedRetrieved();
throw new YarnException("listReservation Failed.");
}
@Override
@ -1808,6 +1836,31 @@ private SubClusterInfo getHomeSubClusterInfoByAppId(String appId)
throw new YarnException("Unable to get subCluster by applicationId = " + appId);
}
/**
* get the HomeSubCluster according to ReservationId.
*
* @param resId reservationId
* @return HomeSubCluster
* @throws YarnException on failure
*/
private SubClusterInfo getHomeSubClusterInfoByReservationId(String resId)
throws YarnException {
try {
ReservationId reservationId = ReservationId.parseReservationId(resId);
SubClusterId subClusterId = federationFacade.getReservationHomeSubCluster(reservationId);
if (subClusterId == null) {
RouterServerUtil.logAndThrowException(null,
"Can't get HomeSubCluster by reservationId %s", resId);
}
SubClusterInfo subClusterInfo = federationFacade.getSubCluster(subClusterId);
return subClusterInfo;
} catch (YarnException | IOException e) {
RouterServerUtil.logAndThrowException(e,
"Get HomeSubClusterInfo by reservationId %s failed.", resId);
}
throw new YarnException("Unable to get subCluster by reservationId = " + resId);
}
@VisibleForTesting
public LRUCacheHashMap<RouterAppInfoCacheKey, AppsInfo> getAppInfosCaches() {
return appInfosCaches;

View File

@ -473,6 +473,11 @@ public void getDeleteReservationFailed() {
LOG.info("Mocked: failed getDeleteReservationFailed call");
metrics.incrDeleteReservationFailedRetrieved();
}
public void getListReservationFailed() {
LOG.info("Mocked: failed getListReservationFailed call");
metrics.incrListReservationFailedRetrieved();
}
}
// Records successes for all calls
@ -643,6 +648,11 @@ public void getDeleteReservationRetrieved(long duration) {
LOG.info("Mocked: successful getDeleteReservation call with duration {}", duration);
metrics.succeededDeleteReservationRetrieved(duration);
}
public void getListReservationRetrieved(long duration) {
LOG.info("Mocked: successful getListReservation call with duration {}", duration);
metrics.succeededListReservationRetrieved(duration);
}
}
@Test
@ -1201,4 +1211,27 @@ public void testGetDeleteReservationRetrievedFailed() {
Assert.assertEquals(totalBadBefore + 1,
metrics.getDeleteReservationFailedRetrieved());
}
@Test
public void testGetListReservationRetrieved() {
long totalGoodBefore = metrics.getNumSucceededListReservationRetrieved();
goodSubCluster.getListReservationRetrieved(150);
Assert.assertEquals(totalGoodBefore + 1,
metrics.getNumSucceededListReservationRetrieved());
Assert.assertEquals(150,
metrics.getLatencySucceededListReservationRetrieved(), ASSERT_DOUBLE_DELTA);
goodSubCluster.getListReservationRetrieved(300);
Assert.assertEquals(totalGoodBefore + 2,
metrics.getNumSucceededListReservationRetrieved());
Assert.assertEquals(225,
metrics.getLatencySucceededListReservationRetrieved(), ASSERT_DOUBLE_DELTA);
}
@Test
public void testGetListReservationRetrievedFailed() {
long totalBadBefore = metrics.getListReservationFailedRetrieved();
badSubCluster.getListReservationFailed();
Assert.assertEquals(totalBadBefore + 1,
metrics.getListReservationFailedRetrieved());
}
}

View File

@ -38,9 +38,12 @@
import org.apache.commons.lang3.EnumUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.util.Sets;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@ -57,10 +60,18 @@
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
import org.apache.hadoop.yarn.api.records.ApplicationTimeout;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@ -70,6 +81,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityLevel;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
@ -97,6 +110,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.StatisticsItemInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationStatisticsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationListInfo;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
@ -126,6 +140,19 @@ public class MockDefaultRequestInterceptorREST
private Map<ApplicationId, ApplicationReport> applicationMap = new HashMap<>();
public static final String APP_STATE_RUNNING = "RUNNING";
private static final String QUEUE_DEFAULT = "default";
private static final String QUEUE_DEFAULT_FULL = CapacitySchedulerConfiguration.ROOT +
CapacitySchedulerConfiguration.DOT + QUEUE_DEFAULT;
private static final String QUEUE_DEDICATED = "dedicated";
public static final String QUEUE_DEDICATED_FULL = CapacitySchedulerConfiguration.ROOT +
CapacitySchedulerConfiguration.DOT + QUEUE_DEDICATED;
// duration(milliseconds), 1mins
public static final long DURATION = 60*1000;
// Containers 4
public static final int NUM_CONTAINERS = 4;
private void validateRunning() throws ConnectException {
if (!isRunning) {
throw new ConnectException("RM is stopped");
@ -788,4 +815,79 @@ public AppActivitiesInfo getAppActivities(
return appActivitiesInfo;
}
@Override
public Response listReservation(String queue, String reservationId, long startTime, long endTime,
boolean includeResourceAllocations, HttpServletRequest hsr) throws Exception {
if (!isRunning) {
throw new RuntimeException("RM is stopped");
}
if (!StringUtils.equals(queue, QUEUE_DEDICATED_FULL)) {
throw new RuntimeException("The specified queue: " + queue +
" is not managed by reservation system." +
" Please try again with a valid reservable queue.");
}
MockRM mockRM = setupResourceManager();
ReservationId reservationID = ReservationId.parseReservationId(reservationId);
ReservationSystem reservationSystem = mockRM.getReservationSystem();
reservationSystem.synchronizePlan(QUEUE_DEDICATED_FULL, true);
// Generate reserved resources
ClientRMService clientService = mockRM.getClientRMService();
// arrival time from which the resource(s) can be allocated.
long arrival = Time.now();
// deadline by when the resource(s) must be allocated.
// The reason for choosing 1.05 is because this gives an integer
// DURATION * 0.05 = 3000(ms)
// deadline = arrival + 3000ms
long deadline = (long) (arrival + 1.05 * DURATION);
// In this test of reserved resources, we will apply for 4 containers (1 core, 1GB memory)
// arrival = Time.now(), and make sure deadline - arrival > duration,
// the current setting is greater than 3000ms
ReservationSubmissionRequest submissionRequest =
ReservationSystemTestUtil.createSimpleReservationRequest(
reservationID, NUM_CONTAINERS, arrival, deadline, DURATION);
clientService.submitReservation(submissionRequest);
// listReservations
ReservationListRequest request = ReservationListRequest.newInstance(
queue, reservationID.toString(), startTime, endTime, includeResourceAllocations);
ReservationListResponse resRespInfo = clientService.listReservations(request);
ReservationListInfo resResponse =
new ReservationListInfo(resRespInfo, includeResourceAllocations);
if (mockRM != null) {
mockRM.stop();
}
return Response.status(Status.OK).entity(resResponse).build();
}
private MockRM setupResourceManager() throws Exception {
DefaultMetricsSystem.setMiniClusterMode(true);
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
// Define default queue
conf.setCapacity(QUEUE_DEFAULT_FULL, 20);
// Define dedicated queues
conf.setQueues(CapacitySchedulerConfiguration.ROOT,
new String[] {QUEUE_DEFAULT, QUEUE_DEDICATED});
conf.setCapacity(QUEUE_DEDICATED_FULL, 80);
conf.setReservable(QUEUE_DEDICATED_FULL, true);
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, ResourceScheduler.class);
conf.setBoolean(YarnConfiguration.RM_RESERVATION_SYSTEM_ENABLE, true);
MockRM rm = new MockRM(conf);
rm.start();
rm.registerNode("127.0.0.1:5678", 100*1024, 100);
return rm;
}
}

View File

@ -28,9 +28,11 @@
import java.util.Collections;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@ -48,6 +50,8 @@
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.store.records.ReservationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.store.records.AddReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreTestUtil;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
@ -71,10 +75,15 @@
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.StatisticsItemInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppPriority;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppQueue;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationListInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.NodeIDsInfo;
import org.apache.hadoop.yarn.server.router.webapp.cache.RouterAppInfoCacheKey;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationStatisticsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDefinitionInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationRequestsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationRequestInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
import org.apache.hadoop.yarn.util.LRUCacheHashMap;
@ -85,6 +94,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.hadoop.yarn.server.router.webapp.MockDefaultRequestInterceptorREST.QUEUE_DEDICATED_FULL;
/**
* Extends the {@code BaseRouterClientRMTest} and overrides methods in order to
* use the {@code RouterClientRMService} pipeline test cases for testing the
@ -1056,4 +1067,64 @@ public void testGetAppActivities() throws IOException, InterruptedException {
Assert.assertEquals(appId.toString(), appActivitiesInfo.getApplicationId());
Assert.assertEquals(10, appActivitiesInfo.getAllocations().size());
}
@Test
public void testListReservation() throws Exception {
// Add ReservationId In stateStore
ReservationId reservationId = ReservationId.newInstance(Time.now(), 1);
SubClusterId homeSubClusterId = subClusters.get(0);
ReservationHomeSubCluster reservationHomeSubCluster =
ReservationHomeSubCluster.newInstance(reservationId, homeSubClusterId);
AddReservationHomeSubClusterRequest request =
AddReservationHomeSubClusterRequest.newInstance(reservationHomeSubCluster);
stateStore.addReservationHomeSubCluster(request);
// Call the listReservation method
String applyReservationId = reservationId.toString();
Response listReservationResponse = interceptor.listReservation(
QUEUE_DEDICATED_FULL, applyReservationId, -1, -1, false, null);
Assert.assertNotNull(listReservationResponse);
Assert.assertNotNull(listReservationResponse.getStatus());
Status status = Status.fromStatusCode(listReservationResponse.getStatus());
Assert.assertEquals(Status.OK, status);
Object entity = listReservationResponse.getEntity();
Assert.assertNotNull(entity);
Assert.assertNotNull(entity instanceof ReservationListInfo);
ReservationListInfo listInfo = (ReservationListInfo) entity;
Assert.assertNotNull(listInfo);
List<ReservationInfo> reservationInfoList = listInfo.getReservations();
Assert.assertNotNull(reservationInfoList);
Assert.assertEquals(1, reservationInfoList.size());
ReservationInfo reservationInfo = reservationInfoList.get(0);
Assert.assertNotNull(reservationInfo);
Assert.assertEquals(applyReservationId, reservationInfo.getReservationId());
ReservationDefinitionInfo definitionInfo = reservationInfo.getReservationDefinition();
Assert.assertNotNull(definitionInfo);
ReservationRequestsInfo reservationRequestsInfo = definitionInfo.getReservationRequests();
Assert.assertNotNull(reservationRequestsInfo);
ArrayList<ReservationRequestInfo> reservationRequestInfoList =
reservationRequestsInfo.getReservationRequest();
Assert.assertNotNull(reservationRequestInfoList);
Assert.assertEquals(1, reservationRequestInfoList.size());
ReservationRequestInfo reservationRequestInfo = reservationRequestInfoList.get(0);
Assert.assertNotNull(reservationRequestInfo);
Assert.assertEquals(4, reservationRequestInfo.getNumContainers());
ResourceInfo resourceInfo = reservationRequestInfo.getCapability();
Assert.assertNotNull(resourceInfo);
int vCore = resourceInfo.getvCores();
long memory = resourceInfo.getMemorySize();
Assert.assertEquals(1, vCore);
Assert.assertEquals(1024, memory);
}
}

View File

@ -142,6 +142,8 @@
import com.sun.jersey.api.client.WebResource.Builder;
import net.jcip.annotations.NotThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.servlet.http.HttpServletRequest;
@ -157,6 +159,8 @@ public class TestRouterWebServicesREST {
/** The number of concurrent submissions for multi-thread test. */
private static final int NUM_THREADS_TESTS = 100;
private static final Logger LOG =
LoggerFactory.getLogger(TestRouterWebServicesREST.class);
private static String userName = "test";
@ -196,7 +200,7 @@ public Boolean get() {
}
return false;
}
}, 1000, 10 * 1000);
}, 1000, 20 * 1000);
} catch (Exception e) {
fail("Web app not running");
}