From b1cd88c598f39386987a0a63e5dd5c6359e1b2bd Mon Sep 17 00:00:00 2001 From: slfan1989 <55643692+slfan1989@users.noreply.github.com> Date: Sat, 29 Oct 2022 06:37:35 +0800 Subject: [PATCH] YARN-11229. [Federation] Add checkUserAccessToQueue REST APIs for Router. (#4929) --- .../webapp/dao/RMQueueAclInfo.java | 22 +++- .../yarn/server/router/RouterMetrics.java | 32 +++++ .../webapp/DefaultRequestInterceptorREST.java | 4 +- .../webapp/FederationInterceptorREST.java | 82 +++++++++++-- .../webapp/dao/FederationRMQueueAclInfo.java | 50 ++++++++ .../yarn/server/router/TestRouterMetrics.java | 33 +++++ .../MockDefaultRequestInterceptorREST.java | 116 ++++++++++++++++++ .../webapp/TestFederationInterceptorREST.java | 87 +++++++++++++ .../TestFederationInterceptorRESTRetry.java | 34 ++--- 9 files changed, 422 insertions(+), 38 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/dao/FederationRMQueueAclInfo.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/RMQueueAclInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/RMQueueAclInfo.java index 8799cab535..8432fd2ef3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/RMQueueAclInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/RMQueueAclInfo.java @@ -25,9 +25,10 @@ @XmlRootElement @XmlAccessorType(XmlAccessType.FIELD) public class RMQueueAclInfo { - protected boolean allowed; - protected String user; - protected String diagnostics; + private Boolean allowed; + private String user; + private String diagnostics; + private String subClusterId; public RMQueueAclInfo() { @@ -39,6 +40,13 @@ public RMQueueAclInfo(boolean allowed, String user, String diagnostics) { this.diagnostics = diagnostics; } + public RMQueueAclInfo(boolean allowed, String user, String diagnostics, String subClusterId) { + this.allowed = allowed; + this.user = user; + this.diagnostics = diagnostics; + this.subClusterId = subClusterId; + } + public boolean isAllowed() { return allowed; } @@ -62,4 +70,12 @@ public String getDiagnostics() { public void setDiagnostics(String diagnostics) { this.diagnostics = diagnostics; } + + public String getSubClusterId() { + return subClusterId; + } + + public void setSubClusterId(String subClusterId) { + this.subClusterId = subClusterId; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java index cc87c6f4c2..36ba1732ea 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java @@ -121,6 +121,8 @@ public final class RouterMetrics { private MutableGaugeInt numGetAppTimeoutFailedRetrieved; @Metric("# of getAppTimeouts failed to be retrieved") private MutableGaugeInt numGetAppTimeoutsFailedRetrieved; + @Metric("# of checkUserAccessToQueue failed to be retrieved") + private MutableGaugeInt numCheckUserAccessToQueueFailedRetrieved; // Aggregate metrics are shared, and don't have to be looked up per call @Metric("Total number of successful Submitted apps and latency(ms)") @@ -203,6 +205,8 @@ public final class RouterMetrics { private MutableRate totalSucceededGetAppTimeoutRetrieved; @Metric("Total number of successful Retrieved GetAppTimeouts and latency(ms)") private MutableRate totalSucceededGetAppTimeoutsRetrieved; + @Metric("Total number of successful Retrieved CheckUserAccessToQueue and latency(ms)") + private MutableRate totalSucceededCheckUserAccessToQueueRetrieved; /** * Provide quantile counters for all latencies. @@ -247,10 +251,12 @@ public final class RouterMetrics { private MutableQuantiles getUpdateQueueLatency; private MutableQuantiles getAppTimeoutLatency; private MutableQuantiles getAppTimeoutsLatency; + private MutableQuantiles checkUserAccessToQueueLatency; private static volatile RouterMetrics instance = null; private static MetricsRegistry registry; + @SuppressWarnings("checkstyle:MethodLength") private RouterMetrics() { registry = new MetricsRegistry(RECORD_INFO); registry.tag(RECORD_INFO, "Router"); @@ -398,6 +404,9 @@ private RouterMetrics() { getAppTimeoutsLatency = registry.newQuantiles("getAppTimeoutsLatency", "latency of get apptimeouts timeouts", "ops", "latency", 10); + + checkUserAccessToQueueLatency = registry.newQuantiles("checkUserAccessToQueueLatency", + "latency of get apptimeouts timeouts", "ops", "latency", 10); } public static RouterMetrics getMetrics() { @@ -619,6 +628,11 @@ public long getNumSucceededGetAppTimeoutsRetrieved() { return totalSucceededGetAppTimeoutsRetrieved.lastStat().numSamples(); } + @VisibleForTesting + public long getNumSucceededCheckUserAccessToQueueRetrievedRetrieved() { + return totalSucceededCheckUserAccessToQueueRetrieved.lastStat().numSamples(); + } + @VisibleForTesting public double getLatencySucceededAppsCreated() { return totalSucceededAppsCreated.lastStat().mean(); @@ -819,6 +833,11 @@ public double getLatencySucceededGetAppTimeoutsRetrieved() { return totalSucceededGetAppTimeoutsRetrieved.lastStat().mean(); } + @VisibleForTesting + public double getLatencySucceededCheckUserAccessToQueueRetrieved() { + return totalSucceededCheckUserAccessToQueueRetrieved.lastStat().mean(); + } + @VisibleForTesting public int getAppsFailedCreated() { return numAppsFailedCreated.value(); @@ -1000,6 +1019,10 @@ public int getAppTimeoutsFailedRetrieved() { return numGetAppTimeoutsFailedRetrieved.value(); } + public int getCheckUserAccessToQueueFailedRetrieved() { + return numCheckUserAccessToQueueFailedRetrieved.value(); + } + public void succeededAppsCreated(long duration) { totalSucceededAppsCreated.add(duration); getNewApplicationLatency.add(duration); @@ -1200,6 +1223,11 @@ public void succeededGetAppTimeoutsRetrieved(long duration) { getAppTimeoutsLatency.add(duration); } + public void succeededCheckUserAccessToQueueRetrieved(long duration) { + totalSucceededCheckUserAccessToQueueRetrieved.add(duration); + checkUserAccessToQueueLatency.add(duration); + } + public void incrAppsFailedCreated() { numAppsFailedCreated.incr(); } @@ -1359,4 +1387,8 @@ public void incrGetAppTimeoutFailedRetrieved() { public void incrGetAppTimeoutsFailedRetrieved() { numGetAppTimeoutsFailedRetrieved.incr(); } + + public void incrCheckUserAccessToQueueFailedRetrieved() { + numCheckUserAccessToQueueFailedRetrieved.incr(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/DefaultRequestInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/DefaultRequestInterceptorREST.java index c07056ce8a..0bdd87e4eb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/DefaultRequestInterceptorREST.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/DefaultRequestInterceptorREST.java @@ -526,10 +526,10 @@ public AppAttemptsInfo getAppAttempts(HttpServletRequest hsr, String appId) { @Override public RMQueueAclInfo checkUserAccessToQueue(String queue, String username, - String queueAclType, HttpServletRequest hsr) { + String queueAclType, HttpServletRequest hsr) throws AuthorizationException { return RouterWebServiceUtil.genericForward(webAppAddress, hsr, RMQueueAclInfo.class, HTTPMethods.GET, - RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.QUEUES + "/" + queue + RMWSConsts.RM_WEB_SERVICE_PATH + "/" + RMWSConsts.QUEUES + "/" + queue + "/access", null, null, getConf(), client); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java index e4edcaba7c..9ca4387a4c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java @@ -45,6 +45,7 @@ import org.apache.commons.lang3.NotImplementedException; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.authorize.AuthorizationException; import org.apache.hadoop.util.ReflectionUtils; @@ -104,6 +105,7 @@ import org.apache.hadoop.yarn.server.router.RouterServerUtil; import org.apache.hadoop.yarn.server.router.clientrm.ClientMethod; import org.apache.hadoop.yarn.server.router.webapp.cache.RouterAppInfoCacheKey; +import org.apache.hadoop.yarn.server.router.webapp.dao.FederationRMQueueAclInfo; import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo; import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo; import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo; @@ -1808,8 +1810,54 @@ public AppAttemptsInfo getAppAttempts(HttpServletRequest hsr, String appId) { @Override public RMQueueAclInfo checkUserAccessToQueue(String queue, String username, - String queueAclType, HttpServletRequest hsr) { - throw new NotImplementedException("Code is not implemented"); + String queueAclType, HttpServletRequest hsr) throws AuthorizationException { + + // Parameter Verification + if (queue == null || queue.isEmpty()) { + routerMetrics.incrCheckUserAccessToQueueFailedRetrieved(); + throw new IllegalArgumentException("Parameter error, the queue is empty or null."); + } + + if (username == null || username.isEmpty()) { + routerMetrics.incrCheckUserAccessToQueueFailedRetrieved(); + throw new IllegalArgumentException("Parameter error, the username is empty or null."); + } + + if (queueAclType == null || queueAclType.isEmpty()) { + routerMetrics.incrCheckUserAccessToQueueFailedRetrieved(); + throw new IllegalArgumentException("Parameter error, the queueAclType is empty or null."); + } + + // Traverse SubCluster and call checkUserAccessToQueue Api + try { + long startTime = Time.now(); + Map subClustersActive = getActiveSubclusters(); + final HttpServletRequest hsrCopy = clone(hsr); + Class[] argsClasses = new Class[]{String.class, String.class, String.class, + HttpServletRequest.class}; + Object[] args = new Object[]{queue, username, queueAclType, hsrCopy}; + ClientMethod remoteMethod = new ClientMethod("checkUserAccessToQueue", argsClasses, args); + Map rmQueueAclInfoMap = + invokeConcurrent(subClustersActive.values(), remoteMethod, RMQueueAclInfo.class); + FederationRMQueueAclInfo aclInfo = new FederationRMQueueAclInfo(); + rmQueueAclInfoMap.forEach((subClusterInfo, rMQueueAclInfo) -> { + SubClusterId subClusterId = subClusterInfo.getSubClusterId(); + rMQueueAclInfo.setSubClusterId(subClusterId.getId()); + aclInfo.getList().add(rMQueueAclInfo); + }); + long stopTime = Time.now(); + routerMetrics.succeededCheckUserAccessToQueueRetrieved(stopTime - startTime); + return aclInfo; + } catch (NotFoundException e) { + routerMetrics.incrCheckUserAccessToQueueFailedRetrieved(); + RouterServerUtil.logAndThrowRunTimeException("Get all active sub cluster(s) error.", e); + } catch (YarnException | IOException e) { + routerMetrics.incrCheckUserAccessToQueueFailedRetrieved(); + RouterServerUtil.logAndThrowRunTimeException("checkUserAccessToQueue error.", e); + } + + routerMetrics.incrCheckUserAccessToQueueFailedRetrieved(); + throw new RuntimeException("checkUserAccessToQueue error."); } @Override @@ -2024,9 +2072,14 @@ private Map invokeConcurrent(Collection c Map results = new HashMap<>(); // Send the requests in parallel - CompletionService compSvc = new ExecutorCompletionService<>(this.threadpool); + CompletionService> compSvc = + new ExecutorCompletionService<>(this.threadpool); - // Error Msg + // This part of the code should be able to expose the accessed Exception information. + // We use Pair to store related information. The left value of the Pair is the response, + // and the right value is the exception. + // If the request is normal, the response is not empty and the exception is empty; + // if the request is abnormal, the response is empty and the exception is not empty. for (final SubClusterInfo info : clusterIds) { compSvc.submit(() -> { DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster( @@ -2036,29 +2089,36 @@ private Map invokeConcurrent(Collection c getMethod(request.getMethodName(), request.getTypes()); Object retObj = method.invoke(interceptor, request.getParams()); R ret = clazz.cast(retObj); - return ret; + return Pair.of(ret, null); } catch (Exception e) { LOG.error("SubCluster {} failed to call {} method.", info.getSubClusterId(), request.getMethodName(), e); - return null; + return Pair.of(null, e); } }); } clusterIds.stream().forEach(clusterId -> { try { - Future future = compSvc.take(); - R response = future.get(); + Future> future = compSvc.take(); + Pair pair = future.get(); + R response = pair.getKey(); if (response != null) { results.put(clusterId, response); } + + Exception exception = pair.getRight(); + if (exception != null) { + throw exception; + } } catch (Throwable e) { String msg = String.format("SubCluster %s failed to %s report.", - clusterId, request.getMethodName()); - LOG.warn(msg, e); - throw new YarnRuntimeException(msg, e); + clusterId.getSubClusterId(), request.getMethodName()); + LOG.error(msg, e); + throw new YarnRuntimeException(e.getCause().getMessage(), e); } }); + return results; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/dao/FederationRMQueueAclInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/dao/FederationRMQueueAclInfo.java new file mode 100644 index 0000000000..4e61fd772e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/dao/FederationRMQueueAclInfo.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.router.webapp.dao; + +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.RMQueueAclInfo; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlRootElement; +import java.util.ArrayList; +import java.util.List; + +@XmlRootElement +@XmlAccessorType(XmlAccessType.FIELD) +public class FederationRMQueueAclInfo extends RMQueueAclInfo { + + @XmlElement(name = "subCluster") + private List list = new ArrayList<>(); + + public FederationRMQueueAclInfo() { + } // JAXB needs this + + public FederationRMQueueAclInfo(ArrayList list) { + this.list = list; + } + + public List getList() { + return list; + } + + public void setList(List list) { + this.list = list; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java index 80649f0687..c74780089e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java @@ -513,6 +513,11 @@ public void getAppTimeoutsFailed() { LOG.info("Mocked: failed getAppTimeoutsFailed call"); metrics.incrGetAppTimeoutsFailedRetrieved(); } + + public void getCheckUserAccessToQueueRetrieved() { + LOG.info("Mocked: failed checkUserAccessToQueueRetrieved call"); + metrics.incrCheckUserAccessToQueueFailedRetrieved(); + } } // Records successes for all calls @@ -723,6 +728,11 @@ public void getAppTimeoutsRetrieved(long duration) { LOG.info("Mocked: successful getAppTimeouts call with duration {}", duration); metrics.succeededGetAppTimeoutsRetrieved(duration); } + + public void getCheckUserAccessToQueueRetrieved(long duration) { + LOG.info("Mocked: successful CheckUserAccessToQueue call with duration {}", duration); + metrics.succeededCheckUserAccessToQueueRetrieved(duration); + } } @Test @@ -1465,4 +1475,27 @@ public void testGetAppTimeoutsRetrievedFailed() { Assert.assertEquals(totalBadBefore + 1, metrics.getAppTimeoutsFailedRetrieved()); } + + @Test + public void testCheckUserAccessToQueueRetrievedRetrieved() { + long totalGoodBefore = metrics.getNumSucceededCheckUserAccessToQueueRetrievedRetrieved(); + goodSubCluster.getCheckUserAccessToQueueRetrieved(150); + Assert.assertEquals(totalGoodBefore + 1, + metrics.getNumSucceededCheckUserAccessToQueueRetrievedRetrieved()); + Assert.assertEquals(150, + metrics.getLatencySucceededCheckUserAccessToQueueRetrieved(), ASSERT_DOUBLE_DELTA); + goodSubCluster.getCheckUserAccessToQueueRetrieved(300); + Assert.assertEquals(totalGoodBefore + 2, + metrics.getNumSucceededCheckUserAccessToQueueRetrievedRetrieved()); + Assert.assertEquals(225, + metrics.getLatencySucceededCheckUserAccessToQueueRetrieved(), ASSERT_DOUBLE_DELTA); + } + + @Test + public void testCheckUserAccessToQueueRetrievedFailed() { + long totalBadBefore = metrics.getCheckUserAccessToQueueFailedRetrieved(); + badSubCluster.getCheckUserAccessToQueueRetrieved(); + Assert.assertEquals(totalBadBefore + 1, + metrics.getCheckUserAccessToQueueFailedRetrieved()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java index 00ef92af4d..af7ecb6542 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.net.ConnectException; +import java.security.Principal; import java.util.ArrayList; import java.util.Set; import java.util.Map; @@ -32,14 +33,18 @@ import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; +import javax.ws.rs.core.Context; import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; import org.apache.commons.lang3.EnumUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.AuthorizationException; +import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet; import org.apache.hadoop.util.Sets; import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -60,6 +65,7 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; import org.apache.hadoop.yarn.api.records.ApplicationTimeout; +import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest; import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse; @@ -70,6 +76,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -111,6 +118,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationStatisticsInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationListInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.RMQueueAclInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebServices; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo; import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo; @@ -118,12 +127,14 @@ import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.webapp.BadRequestException; +import org.apache.hadoop.yarn.webapp.ForbiddenException; import org.apache.hadoop.yarn.webapp.NotFoundException; import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; /** * This class mocks the RESTRequestInterceptor. @@ -907,4 +918,109 @@ private MockRM setupResourceManager() throws Exception { rm.registerNode("127.0.0.1:5678", 100*1024, 100); return rm; } + + @Override + public RMQueueAclInfo checkUserAccessToQueue(String queue, String username, + String queueAclType, HttpServletRequest hsr) throws AuthorizationException { + + ResourceManager mockRM = mock(ResourceManager.class); + Configuration conf = new YarnConfiguration(); + + ResourceScheduler mockScheduler = new CapacityScheduler() { + @Override + public synchronized boolean checkAccess(UserGroupInformation callerUGI, + QueueACL acl, String queueName) { + if (acl == QueueACL.ADMINISTER_QUEUE) { + if (callerUGI.getUserName().equals("admin")) { + return true; + } + } else { + if (ImmutableSet.of("admin", "yarn").contains(callerUGI.getUserName())) { + return true; + } + } + return false; + } + }; + + when(mockRM.getResourceScheduler()).thenReturn(mockScheduler); + MockRMWebServices webSvc = new MockRMWebServices(mockRM, conf, mock(HttpServletResponse.class)); + return webSvc.checkUserAccessToQueue(queue, username, queueAclType, hsr); + } + + class MockRMWebServices { + + @Context + private HttpServletResponse httpServletResponse; + private ResourceManager resourceManager; + + private void initForReadableEndpoints() { + // clear content type + httpServletResponse.setContentType(null); + } + + MockRMWebServices(ResourceManager rm, Configuration conf, HttpServletResponse response) { + this.resourceManager = rm; + this.httpServletResponse = response; + } + + private UserGroupInformation getCallerUserGroupInformation( + HttpServletRequest hsr, boolean usePrincipal) { + + String remoteUser = hsr.getRemoteUser(); + + if (usePrincipal) { + Principal princ = hsr.getUserPrincipal(); + remoteUser = princ == null ? null : princ.getName(); + } + + UserGroupInformation callerUGI = null; + if (remoteUser != null) { + callerUGI = UserGroupInformation.createRemoteUser(remoteUser); + } + + return callerUGI; + } + + public RMQueueAclInfo checkUserAccessToQueue( + String queue, String username, String queueAclType, HttpServletRequest hsr) + throws AuthorizationException { + initForReadableEndpoints(); + + // For the user who invokes this REST call, he/she should have admin access + // to the queue. Otherwise we will reject the call. + UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true); + if (callerUGI != null && !this.resourceManager.getResourceScheduler().checkAccess( + callerUGI, QueueACL.ADMINISTER_QUEUE, queue)) { + throw new ForbiddenException( + "User=" + callerUGI.getUserName() + " doesn't haven access to queue=" + + queue + " so it cannot check ACLs for other users."); + } + + // Create UGI for the to-be-checked user. + UserGroupInformation user = UserGroupInformation.createRemoteUser(username); + if (user == null) { + throw new ForbiddenException( + "Failed to retrieve UserGroupInformation for user=" + username); + } + + // Check if the specified queue acl is valid. + QueueACL queueACL; + try { + queueACL = QueueACL.valueOf(queueAclType); + } catch (IllegalArgumentException e) { + throw new BadRequestException("Specified queueAclType=" + queueAclType + + " is not a valid type, valid queue acl types={" + + "SUBMIT_APPLICATIONS/ADMINISTER_QUEUE}"); + } + + if (!this.resourceManager.getResourceScheduler().checkAccess(user, queueACL, queue)) { + return new RMQueueAclInfo(false, user.getUserName(), + "User=" + username + " doesn't have access to queue=" + queue + + " with acl-type=" + queueAclType); + } + + return new RMQueueAclInfo(true, user.getUserName(), ""); + } + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java index 23bcb74cb1..c769148d8e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.router.webapp; import java.io.IOException; +import java.security.Principal; import java.util.ArrayList; import java.util.List; import java.util.HashMap; @@ -27,9 +28,11 @@ import java.util.HashSet; import java.util.Collections; +import javax.servlet.http.HttpServletRequest; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; +import org.apache.hadoop.security.authorize.AuthorizationException; import org.apache.hadoop.test.LambdaTestUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.http.HttpConfig; @@ -43,8 +46,10 @@ import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager; import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; @@ -81,6 +86,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppQueue; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationListInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.RMQueueAclInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.NodeIDsInfo; import org.apache.hadoop.yarn.server.router.webapp.cache.RouterAppInfoCacheKey; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationStatisticsInfo; @@ -90,6 +96,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationRequestInfo; import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo; import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo; +import org.apache.hadoop.yarn.server.router.webapp.dao.FederationRMQueueAclInfo; import org.apache.hadoop.yarn.util.LRUCacheHashMap; import org.apache.hadoop.yarn.util.MonotonicClock; import org.apache.hadoop.yarn.util.Times; @@ -100,6 +107,8 @@ import org.slf4j.LoggerFactory; import static org.apache.hadoop.yarn.server.router.webapp.MockDefaultRequestInterceptorREST.QUEUE_DEDICATED_FULL; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; /** * Extends the {@code BaseRouterClientRMTest} and overrides methods in order to @@ -1170,4 +1179,82 @@ public void testWebAddressWithScheme() { WebAppUtils.getHttpSchemePrefix(this.getConf()) + webAppAddress; Assert.assertEquals(expectedHttpsWebAddress, webAppAddressWithScheme2); } + + @Test + public void testCheckUserAccessToQueue() throws Exception { + + // Case 1: Only queue admin user can access other user's information + HttpServletRequest mockHsr = mockHttpServletRequestByUserName("non-admin"); + String errorMsg1 = "User=non-admin doesn't haven access to queue=queue " + + "so it cannot check ACLs for other users."; + LambdaTestUtils.intercept(YarnRuntimeException.class, errorMsg1, + () -> interceptor.checkUserAccessToQueue("queue", "jack", + QueueACL.SUBMIT_APPLICATIONS.name(), mockHsr)); + + // Case 2: request an unknown ACL causes BAD_REQUEST + HttpServletRequest mockHsr1 = mockHttpServletRequestByUserName("admin"); + String errorMsg2 = "Specified queueAclType=XYZ_ACL is not a valid type, " + + "valid queue acl types={SUBMIT_APPLICATIONS/ADMINISTER_QUEUE}"; + LambdaTestUtils.intercept(YarnRuntimeException.class, errorMsg2, + () -> interceptor.checkUserAccessToQueue("queue", "jack", "XYZ_ACL", mockHsr1)); + + // We design a test, admin user has ADMINISTER_QUEUE, SUBMIT_APPLICATIONS permissions, + // yarn user has SUBMIT_APPLICATIONS permissions, other users have no permissions + + // Case 3: get FORBIDDEN for rejected ACL + checkUserAccessToQueueFailed("queue", "jack", QueueACL.SUBMIT_APPLICATIONS, "admin"); + checkUserAccessToQueueFailed("queue", "jack", QueueACL.ADMINISTER_QUEUE, "admin"); + + // Case 4: get OK for listed ACLs + checkUserAccessToQueueSuccess("queue", "admin", QueueACL.ADMINISTER_QUEUE, "admin"); + checkUserAccessToQueueSuccess("queue", "admin", QueueACL.SUBMIT_APPLICATIONS, "admin"); + + // Case 5: get OK only for SUBMIT_APP acl for "yarn" user + checkUserAccessToQueueFailed("queue", "yarn", QueueACL.ADMINISTER_QUEUE, "admin"); + checkUserAccessToQueueSuccess("queue", "yarn", QueueACL.SUBMIT_APPLICATIONS, "admin"); + } + + private void checkUserAccessToQueueSuccess(String queue, String userName, + QueueACL queueACL, String mockUser) throws AuthorizationException { + HttpServletRequest mockHsr = mockHttpServletRequestByUserName(mockUser); + RMQueueAclInfo aclInfo = + interceptor.checkUserAccessToQueue(queue, userName, queueACL.name(), mockHsr); + Assert.assertNotNull(aclInfo); + Assert.assertTrue(aclInfo instanceof FederationRMQueueAclInfo); + FederationRMQueueAclInfo fedAclInfo = FederationRMQueueAclInfo.class.cast(aclInfo); + List aclInfos = fedAclInfo.getList(); + Assert.assertNotNull(aclInfos); + Assert.assertEquals(4, aclInfos.size()); + for (RMQueueAclInfo rMQueueAclInfo : aclInfos) { + Assert.assertTrue(rMQueueAclInfo.isAllowed()); + } + } + + private void checkUserAccessToQueueFailed(String queue, String userName, + QueueACL queueACL, String mockUser) throws AuthorizationException { + HttpServletRequest mockHsr = mockHttpServletRequestByUserName(mockUser); + RMQueueAclInfo aclInfo = + interceptor.checkUserAccessToQueue(queue, userName, queueACL.name(), mockHsr); + Assert.assertNotNull(aclInfo); + Assert.assertTrue(aclInfo instanceof FederationRMQueueAclInfo); + FederationRMQueueAclInfo fedAclInfo = FederationRMQueueAclInfo.class.cast(aclInfo); + List aclInfos = fedAclInfo.getList(); + Assert.assertNotNull(aclInfos); + Assert.assertEquals(4, aclInfos.size()); + for (RMQueueAclInfo rMQueueAclInfo : aclInfos) { + Assert.assertFalse(rMQueueAclInfo.isAllowed()); + String expectDiagnostics = "User=" + userName + + " doesn't have access to queue=queue with acl-type=" + queueACL.name(); + Assert.assertEquals(expectDiagnostics, rMQueueAclInfo.getDiagnostics()); + } + } + + private HttpServletRequest mockHttpServletRequestByUserName(String username) { + HttpServletRequest mockHsr = mock(HttpServletRequest.class); + when(mockHsr.getRemoteUser()).thenReturn(username); + Principal principal = mock(Principal.class); + when(principal.getName()).thenReturn(username); + when(mockHsr.getUserPrincipal()).thenReturn(principal); + return mockHsr; + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorRESTRetry.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorRESTRetry.java index b2f421e25a..e598a52f87 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorRESTRetry.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorRESTRetry.java @@ -25,9 +25,11 @@ import javax.ws.rs.core.Response; +import org.apache.hadoop.test.LambdaTestUtils; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils; import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager; import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore; @@ -40,7 +42,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewApplication; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo; -import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo; import org.apache.hadoop.yarn.server.router.clientrm.PassThroughClientRequestInterceptor; import org.apache.hadoop.yarn.server.router.clientrm.TestableFederationClientInterceptor; import org.apache.hadoop.yarn.webapp.NotFoundException; @@ -377,15 +378,12 @@ public void testGetNodeOneBadOneGood() * composed of only 1 bad SubCluster. */ @Test - public void testGetNodesOneBadSC() - throws YarnException, IOException, InterruptedException { + public void testGetNodesOneBadSC() throws Exception { setupCluster(Arrays.asList(bad2)); - NodesInfo response = interceptor.getNodes(null); - Assert.assertNotNull(response); - Assert.assertEquals(0, response.getNodes().size()); - // The remove duplicate operations is tested in TestRouterWebServiceUtil + LambdaTestUtils.intercept(YarnRuntimeException.class, "RM is stopped", + () -> interceptor.getNodes(null)); } /** @@ -393,14 +391,12 @@ public void testGetNodesOneBadSC() * composed of only 2 bad SubClusters. */ @Test - public void testGetNodesTwoBadSCs() - throws YarnException, IOException, InterruptedException { + public void testGetNodesTwoBadSCs() throws Exception { + setupCluster(Arrays.asList(bad1, bad2)); - NodesInfo response = interceptor.getNodes(null); - Assert.assertNotNull(response); - Assert.assertEquals(0, response.getNodes().size()); - // The remove duplicate operations is tested in TestRouterWebServiceUtil + LambdaTestUtils.intercept(YarnRuntimeException.class, "RM is stopped", + () -> interceptor.getNodes(null)); } /** @@ -408,17 +404,11 @@ public void testGetNodesTwoBadSCs() * composed of only 1 bad SubCluster and a good one. */ @Test - public void testGetNodesOneBadOneGood() - throws YarnException, IOException, InterruptedException { + public void testGetNodesOneBadOneGood() throws Exception { setupCluster(Arrays.asList(good, bad2)); - NodesInfo response = interceptor.getNodes(null); - Assert.assertNotNull(response); - Assert.assertEquals(1, response.getNodes().size()); - // Check if the only node came from Good SubCluster - Assert.assertEquals(good.getId(), - Long.toString(response.getNodes().get(0).getLastHealthUpdate())); - // The remove duplicate operations is tested in TestRouterWebServiceUtil + LambdaTestUtils.intercept(YarnRuntimeException.class, "RM is stopped", + () -> interceptor.getNodes(null)); } /**