YARN-11229. [Federation] Add checkUserAccessToQueue REST APIs for Router. (#4929)
This commit is contained in:
parent
bfb84cd7f6
commit
b1cd88c598
@ -25,9 +25,10 @@
|
||||
@XmlRootElement
|
||||
@XmlAccessorType(XmlAccessType.FIELD)
|
||||
public class RMQueueAclInfo {
|
||||
protected boolean allowed;
|
||||
protected String user;
|
||||
protected String diagnostics;
|
||||
private Boolean allowed;
|
||||
private String user;
|
||||
private String diagnostics;
|
||||
private String subClusterId;
|
||||
|
||||
public RMQueueAclInfo() {
|
||||
|
||||
@ -39,6 +40,13 @@ public RMQueueAclInfo(boolean allowed, String user, String diagnostics) {
|
||||
this.diagnostics = diagnostics;
|
||||
}
|
||||
|
||||
public RMQueueAclInfo(boolean allowed, String user, String diagnostics, String subClusterId) {
|
||||
this.allowed = allowed;
|
||||
this.user = user;
|
||||
this.diagnostics = diagnostics;
|
||||
this.subClusterId = subClusterId;
|
||||
}
|
||||
|
||||
public boolean isAllowed() {
|
||||
return allowed;
|
||||
}
|
||||
@ -62,4 +70,12 @@ public String getDiagnostics() {
|
||||
public void setDiagnostics(String diagnostics) {
|
||||
this.diagnostics = diagnostics;
|
||||
}
|
||||
|
||||
public String getSubClusterId() {
|
||||
return subClusterId;
|
||||
}
|
||||
|
||||
public void setSubClusterId(String subClusterId) {
|
||||
this.subClusterId = subClusterId;
|
||||
}
|
||||
}
|
||||
|
@ -121,6 +121,8 @@ public final class RouterMetrics {
|
||||
private MutableGaugeInt numGetAppTimeoutFailedRetrieved;
|
||||
@Metric("# of getAppTimeouts failed to be retrieved")
|
||||
private MutableGaugeInt numGetAppTimeoutsFailedRetrieved;
|
||||
@Metric("# of checkUserAccessToQueue failed to be retrieved")
|
||||
private MutableGaugeInt numCheckUserAccessToQueueFailedRetrieved;
|
||||
|
||||
// Aggregate metrics are shared, and don't have to be looked up per call
|
||||
@Metric("Total number of successful Submitted apps and latency(ms)")
|
||||
@ -203,6 +205,8 @@ public final class RouterMetrics {
|
||||
private MutableRate totalSucceededGetAppTimeoutRetrieved;
|
||||
@Metric("Total number of successful Retrieved GetAppTimeouts and latency(ms)")
|
||||
private MutableRate totalSucceededGetAppTimeoutsRetrieved;
|
||||
@Metric("Total number of successful Retrieved CheckUserAccessToQueue and latency(ms)")
|
||||
private MutableRate totalSucceededCheckUserAccessToQueueRetrieved;
|
||||
|
||||
/**
|
||||
* Provide quantile counters for all latencies.
|
||||
@ -247,10 +251,12 @@ public final class RouterMetrics {
|
||||
private MutableQuantiles getUpdateQueueLatency;
|
||||
private MutableQuantiles getAppTimeoutLatency;
|
||||
private MutableQuantiles getAppTimeoutsLatency;
|
||||
private MutableQuantiles checkUserAccessToQueueLatency;
|
||||
|
||||
private static volatile RouterMetrics instance = null;
|
||||
private static MetricsRegistry registry;
|
||||
|
||||
@SuppressWarnings("checkstyle:MethodLength")
|
||||
private RouterMetrics() {
|
||||
registry = new MetricsRegistry(RECORD_INFO);
|
||||
registry.tag(RECORD_INFO, "Router");
|
||||
@ -398,6 +404,9 @@ private RouterMetrics() {
|
||||
|
||||
getAppTimeoutsLatency = registry.newQuantiles("getAppTimeoutsLatency",
|
||||
"latency of get apptimeouts timeouts", "ops", "latency", 10);
|
||||
|
||||
checkUserAccessToQueueLatency = registry.newQuantiles("checkUserAccessToQueueLatency",
|
||||
"latency of get apptimeouts timeouts", "ops", "latency", 10);
|
||||
}
|
||||
|
||||
public static RouterMetrics getMetrics() {
|
||||
@ -619,6 +628,11 @@ public long getNumSucceededGetAppTimeoutsRetrieved() {
|
||||
return totalSucceededGetAppTimeoutsRetrieved.lastStat().numSamples();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public long getNumSucceededCheckUserAccessToQueueRetrievedRetrieved() {
|
||||
return totalSucceededCheckUserAccessToQueueRetrieved.lastStat().numSamples();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public double getLatencySucceededAppsCreated() {
|
||||
return totalSucceededAppsCreated.lastStat().mean();
|
||||
@ -819,6 +833,11 @@ public double getLatencySucceededGetAppTimeoutsRetrieved() {
|
||||
return totalSucceededGetAppTimeoutsRetrieved.lastStat().mean();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public double getLatencySucceededCheckUserAccessToQueueRetrieved() {
|
||||
return totalSucceededCheckUserAccessToQueueRetrieved.lastStat().mean();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public int getAppsFailedCreated() {
|
||||
return numAppsFailedCreated.value();
|
||||
@ -1000,6 +1019,10 @@ public int getAppTimeoutsFailedRetrieved() {
|
||||
return numGetAppTimeoutsFailedRetrieved.value();
|
||||
}
|
||||
|
||||
public int getCheckUserAccessToQueueFailedRetrieved() {
|
||||
return numCheckUserAccessToQueueFailedRetrieved.value();
|
||||
}
|
||||
|
||||
public void succeededAppsCreated(long duration) {
|
||||
totalSucceededAppsCreated.add(duration);
|
||||
getNewApplicationLatency.add(duration);
|
||||
@ -1200,6 +1223,11 @@ public void succeededGetAppTimeoutsRetrieved(long duration) {
|
||||
getAppTimeoutsLatency.add(duration);
|
||||
}
|
||||
|
||||
public void succeededCheckUserAccessToQueueRetrieved(long duration) {
|
||||
totalSucceededCheckUserAccessToQueueRetrieved.add(duration);
|
||||
checkUserAccessToQueueLatency.add(duration);
|
||||
}
|
||||
|
||||
public void incrAppsFailedCreated() {
|
||||
numAppsFailedCreated.incr();
|
||||
}
|
||||
@ -1359,4 +1387,8 @@ public void incrGetAppTimeoutFailedRetrieved() {
|
||||
public void incrGetAppTimeoutsFailedRetrieved() {
|
||||
numGetAppTimeoutsFailedRetrieved.incr();
|
||||
}
|
||||
|
||||
public void incrCheckUserAccessToQueueFailedRetrieved() {
|
||||
numCheckUserAccessToQueueFailedRetrieved.incr();
|
||||
}
|
||||
}
|
||||
|
@ -526,10 +526,10 @@ public AppAttemptsInfo getAppAttempts(HttpServletRequest hsr, String appId) {
|
||||
|
||||
@Override
|
||||
public RMQueueAclInfo checkUserAccessToQueue(String queue, String username,
|
||||
String queueAclType, HttpServletRequest hsr) {
|
||||
String queueAclType, HttpServletRequest hsr) throws AuthorizationException {
|
||||
return RouterWebServiceUtil.genericForward(webAppAddress, hsr,
|
||||
RMQueueAclInfo.class, HTTPMethods.GET,
|
||||
RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.QUEUES + "/" + queue
|
||||
RMWSConsts.RM_WEB_SERVICE_PATH + "/" + RMWSConsts.QUEUES + "/" + queue
|
||||
+ "/access", null, null, getConf(), client);
|
||||
}
|
||||
|
||||
|
@ -45,6 +45,7 @@
|
||||
|
||||
import org.apache.commons.lang3.NotImplementedException;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.security.authorize.AuthorizationException;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
@ -104,6 +105,7 @@
|
||||
import org.apache.hadoop.yarn.server.router.RouterServerUtil;
|
||||
import org.apache.hadoop.yarn.server.router.clientrm.ClientMethod;
|
||||
import org.apache.hadoop.yarn.server.router.webapp.cache.RouterAppInfoCacheKey;
|
||||
import org.apache.hadoop.yarn.server.router.webapp.dao.FederationRMQueueAclInfo;
|
||||
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
|
||||
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
|
||||
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
|
||||
@ -1808,8 +1810,54 @@ public AppAttemptsInfo getAppAttempts(HttpServletRequest hsr, String appId) {
|
||||
|
||||
@Override
|
||||
public RMQueueAclInfo checkUserAccessToQueue(String queue, String username,
|
||||
String queueAclType, HttpServletRequest hsr) {
|
||||
throw new NotImplementedException("Code is not implemented");
|
||||
String queueAclType, HttpServletRequest hsr) throws AuthorizationException {
|
||||
|
||||
// Parameter Verification
|
||||
if (queue == null || queue.isEmpty()) {
|
||||
routerMetrics.incrCheckUserAccessToQueueFailedRetrieved();
|
||||
throw new IllegalArgumentException("Parameter error, the queue is empty or null.");
|
||||
}
|
||||
|
||||
if (username == null || username.isEmpty()) {
|
||||
routerMetrics.incrCheckUserAccessToQueueFailedRetrieved();
|
||||
throw new IllegalArgumentException("Parameter error, the username is empty or null.");
|
||||
}
|
||||
|
||||
if (queueAclType == null || queueAclType.isEmpty()) {
|
||||
routerMetrics.incrCheckUserAccessToQueueFailedRetrieved();
|
||||
throw new IllegalArgumentException("Parameter error, the queueAclType is empty or null.");
|
||||
}
|
||||
|
||||
// Traverse SubCluster and call checkUserAccessToQueue Api
|
||||
try {
|
||||
long startTime = Time.now();
|
||||
Map<SubClusterId, SubClusterInfo> subClustersActive = getActiveSubclusters();
|
||||
final HttpServletRequest hsrCopy = clone(hsr);
|
||||
Class[] argsClasses = new Class[]{String.class, String.class, String.class,
|
||||
HttpServletRequest.class};
|
||||
Object[] args = new Object[]{queue, username, queueAclType, hsrCopy};
|
||||
ClientMethod remoteMethod = new ClientMethod("checkUserAccessToQueue", argsClasses, args);
|
||||
Map<SubClusterInfo, RMQueueAclInfo> rmQueueAclInfoMap =
|
||||
invokeConcurrent(subClustersActive.values(), remoteMethod, RMQueueAclInfo.class);
|
||||
FederationRMQueueAclInfo aclInfo = new FederationRMQueueAclInfo();
|
||||
rmQueueAclInfoMap.forEach((subClusterInfo, rMQueueAclInfo) -> {
|
||||
SubClusterId subClusterId = subClusterInfo.getSubClusterId();
|
||||
rMQueueAclInfo.setSubClusterId(subClusterId.getId());
|
||||
aclInfo.getList().add(rMQueueAclInfo);
|
||||
});
|
||||
long stopTime = Time.now();
|
||||
routerMetrics.succeededCheckUserAccessToQueueRetrieved(stopTime - startTime);
|
||||
return aclInfo;
|
||||
} catch (NotFoundException e) {
|
||||
routerMetrics.incrCheckUserAccessToQueueFailedRetrieved();
|
||||
RouterServerUtil.logAndThrowRunTimeException("Get all active sub cluster(s) error.", e);
|
||||
} catch (YarnException | IOException e) {
|
||||
routerMetrics.incrCheckUserAccessToQueueFailedRetrieved();
|
||||
RouterServerUtil.logAndThrowRunTimeException("checkUserAccessToQueue error.", e);
|
||||
}
|
||||
|
||||
routerMetrics.incrCheckUserAccessToQueueFailedRetrieved();
|
||||
throw new RuntimeException("checkUserAccessToQueue error.");
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -2024,9 +2072,14 @@ private <R> Map<SubClusterInfo, R> invokeConcurrent(Collection<SubClusterInfo> c
|
||||
Map<SubClusterInfo, R> results = new HashMap<>();
|
||||
|
||||
// Send the requests in parallel
|
||||
CompletionService<R> compSvc = new ExecutorCompletionService<>(this.threadpool);
|
||||
CompletionService<Pair<R, Exception>> compSvc =
|
||||
new ExecutorCompletionService<>(this.threadpool);
|
||||
|
||||
// Error Msg
|
||||
// This part of the code should be able to expose the accessed Exception information.
|
||||
// We use Pair to store related information. The left value of the Pair is the response,
|
||||
// and the right value is the exception.
|
||||
// If the request is normal, the response is not empty and the exception is empty;
|
||||
// if the request is abnormal, the response is empty and the exception is not empty.
|
||||
for (final SubClusterInfo info : clusterIds) {
|
||||
compSvc.submit(() -> {
|
||||
DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
|
||||
@ -2036,29 +2089,36 @@ private <R> Map<SubClusterInfo, R> invokeConcurrent(Collection<SubClusterInfo> c
|
||||
getMethod(request.getMethodName(), request.getTypes());
|
||||
Object retObj = method.invoke(interceptor, request.getParams());
|
||||
R ret = clazz.cast(retObj);
|
||||
return ret;
|
||||
return Pair.of(ret, null);
|
||||
} catch (Exception e) {
|
||||
LOG.error("SubCluster {} failed to call {} method.",
|
||||
info.getSubClusterId(), request.getMethodName(), e);
|
||||
return null;
|
||||
return Pair.of(null, e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
clusterIds.stream().forEach(clusterId -> {
|
||||
try {
|
||||
Future<R> future = compSvc.take();
|
||||
R response = future.get();
|
||||
Future<Pair<R, Exception>> future = compSvc.take();
|
||||
Pair<R, Exception> pair = future.get();
|
||||
R response = pair.getKey();
|
||||
if (response != null) {
|
||||
results.put(clusterId, response);
|
||||
}
|
||||
|
||||
Exception exception = pair.getRight();
|
||||
if (exception != null) {
|
||||
throw exception;
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
String msg = String.format("SubCluster %s failed to %s report.",
|
||||
clusterId, request.getMethodName());
|
||||
LOG.warn(msg, e);
|
||||
throw new YarnRuntimeException(msg, e);
|
||||
clusterId.getSubClusterId(), request.getMethodName());
|
||||
LOG.error(msg, e);
|
||||
throw new YarnRuntimeException(e.getCause().getMessage(), e);
|
||||
}
|
||||
});
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
|
@ -0,0 +1,50 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.yarn.server.router.webapp.dao;
|
||||
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.RMQueueAclInfo;
|
||||
|
||||
import javax.xml.bind.annotation.XmlAccessType;
|
||||
import javax.xml.bind.annotation.XmlAccessorType;
|
||||
import javax.xml.bind.annotation.XmlElement;
|
||||
import javax.xml.bind.annotation.XmlRootElement;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
@XmlRootElement
|
||||
@XmlAccessorType(XmlAccessType.FIELD)
|
||||
public class FederationRMQueueAclInfo extends RMQueueAclInfo {
|
||||
|
||||
@XmlElement(name = "subCluster")
|
||||
private List<RMQueueAclInfo> list = new ArrayList<>();
|
||||
|
||||
public FederationRMQueueAclInfo() {
|
||||
} // JAXB needs this
|
||||
|
||||
public FederationRMQueueAclInfo(ArrayList<RMQueueAclInfo> list) {
|
||||
this.list = list;
|
||||
}
|
||||
|
||||
public List<RMQueueAclInfo> getList() {
|
||||
return list;
|
||||
}
|
||||
|
||||
public void setList(List<RMQueueAclInfo> list) {
|
||||
this.list = list;
|
||||
}
|
||||
}
|
@ -513,6 +513,11 @@ public void getAppTimeoutsFailed() {
|
||||
LOG.info("Mocked: failed getAppTimeoutsFailed call");
|
||||
metrics.incrGetAppTimeoutsFailedRetrieved();
|
||||
}
|
||||
|
||||
public void getCheckUserAccessToQueueRetrieved() {
|
||||
LOG.info("Mocked: failed checkUserAccessToQueueRetrieved call");
|
||||
metrics.incrCheckUserAccessToQueueFailedRetrieved();
|
||||
}
|
||||
}
|
||||
|
||||
// Records successes for all calls
|
||||
@ -723,6 +728,11 @@ public void getAppTimeoutsRetrieved(long duration) {
|
||||
LOG.info("Mocked: successful getAppTimeouts call with duration {}", duration);
|
||||
metrics.succeededGetAppTimeoutsRetrieved(duration);
|
||||
}
|
||||
|
||||
public void getCheckUserAccessToQueueRetrieved(long duration) {
|
||||
LOG.info("Mocked: successful CheckUserAccessToQueue call with duration {}", duration);
|
||||
metrics.succeededCheckUserAccessToQueueRetrieved(duration);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -1465,4 +1475,27 @@ public void testGetAppTimeoutsRetrievedFailed() {
|
||||
Assert.assertEquals(totalBadBefore + 1,
|
||||
metrics.getAppTimeoutsFailedRetrieved());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCheckUserAccessToQueueRetrievedRetrieved() {
|
||||
long totalGoodBefore = metrics.getNumSucceededCheckUserAccessToQueueRetrievedRetrieved();
|
||||
goodSubCluster.getCheckUserAccessToQueueRetrieved(150);
|
||||
Assert.assertEquals(totalGoodBefore + 1,
|
||||
metrics.getNumSucceededCheckUserAccessToQueueRetrievedRetrieved());
|
||||
Assert.assertEquals(150,
|
||||
metrics.getLatencySucceededCheckUserAccessToQueueRetrieved(), ASSERT_DOUBLE_DELTA);
|
||||
goodSubCluster.getCheckUserAccessToQueueRetrieved(300);
|
||||
Assert.assertEquals(totalGoodBefore + 2,
|
||||
metrics.getNumSucceededCheckUserAccessToQueueRetrievedRetrieved());
|
||||
Assert.assertEquals(225,
|
||||
metrics.getLatencySucceededCheckUserAccessToQueueRetrieved(), ASSERT_DOUBLE_DELTA);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCheckUserAccessToQueueRetrievedFailed() {
|
||||
long totalBadBefore = metrics.getCheckUserAccessToQueueFailedRetrieved();
|
||||
badSubCluster.getCheckUserAccessToQueueRetrieved();
|
||||
Assert.assertEquals(totalBadBefore + 1,
|
||||
metrics.getCheckUserAccessToQueueFailedRetrieved());
|
||||
}
|
||||
}
|
||||
|
@ -20,6 +20,7 @@
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.ConnectException;
|
||||
import java.security.Principal;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Set;
|
||||
import java.util.Map;
|
||||
@ -32,14 +33,18 @@
|
||||
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
import javax.ws.rs.core.Context;
|
||||
import javax.ws.rs.core.HttpHeaders;
|
||||
import javax.ws.rs.core.Response;
|
||||
import javax.ws.rs.core.Response.Status;
|
||||
|
||||
import org.apache.commons.lang3.EnumUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.authorize.AuthorizationException;
|
||||
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
|
||||
import org.apache.hadoop.util.Sets;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
@ -60,6 +65,7 @@
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationTimeout;
|
||||
import org.apache.hadoop.yarn.api.records.QueueACL;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse;
|
||||
@ -70,6 +76,7 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
@ -111,6 +118,8 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationStatisticsInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationListInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.RMQueueAclInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebServices;
|
||||
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
|
||||
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
|
||||
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
|
||||
@ -118,12 +127,14 @@
|
||||
import org.apache.hadoop.yarn.util.SystemClock;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
import org.apache.hadoop.yarn.webapp.BadRequestException;
|
||||
import org.apache.hadoop.yarn.webapp.ForbiddenException;
|
||||
import org.apache.hadoop.yarn.webapp.NotFoundException;
|
||||
import org.mockito.Mockito;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
/**
|
||||
* This class mocks the RESTRequestInterceptor.
|
||||
@ -907,4 +918,109 @@ private MockRM setupResourceManager() throws Exception {
|
||||
rm.registerNode("127.0.0.1:5678", 100*1024, 100);
|
||||
return rm;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RMQueueAclInfo checkUserAccessToQueue(String queue, String username,
|
||||
String queueAclType, HttpServletRequest hsr) throws AuthorizationException {
|
||||
|
||||
ResourceManager mockRM = mock(ResourceManager.class);
|
||||
Configuration conf = new YarnConfiguration();
|
||||
|
||||
ResourceScheduler mockScheduler = new CapacityScheduler() {
|
||||
@Override
|
||||
public synchronized boolean checkAccess(UserGroupInformation callerUGI,
|
||||
QueueACL acl, String queueName) {
|
||||
if (acl == QueueACL.ADMINISTER_QUEUE) {
|
||||
if (callerUGI.getUserName().equals("admin")) {
|
||||
return true;
|
||||
}
|
||||
} else {
|
||||
if (ImmutableSet.of("admin", "yarn").contains(callerUGI.getUserName())) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
};
|
||||
|
||||
when(mockRM.getResourceScheduler()).thenReturn(mockScheduler);
|
||||
MockRMWebServices webSvc = new MockRMWebServices(mockRM, conf, mock(HttpServletResponse.class));
|
||||
return webSvc.checkUserAccessToQueue(queue, username, queueAclType, hsr);
|
||||
}
|
||||
|
||||
class MockRMWebServices {
|
||||
|
||||
@Context
|
||||
private HttpServletResponse httpServletResponse;
|
||||
private ResourceManager resourceManager;
|
||||
|
||||
private void initForReadableEndpoints() {
|
||||
// clear content type
|
||||
httpServletResponse.setContentType(null);
|
||||
}
|
||||
|
||||
MockRMWebServices(ResourceManager rm, Configuration conf, HttpServletResponse response) {
|
||||
this.resourceManager = rm;
|
||||
this.httpServletResponse = response;
|
||||
}
|
||||
|
||||
private UserGroupInformation getCallerUserGroupInformation(
|
||||
HttpServletRequest hsr, boolean usePrincipal) {
|
||||
|
||||
String remoteUser = hsr.getRemoteUser();
|
||||
|
||||
if (usePrincipal) {
|
||||
Principal princ = hsr.getUserPrincipal();
|
||||
remoteUser = princ == null ? null : princ.getName();
|
||||
}
|
||||
|
||||
UserGroupInformation callerUGI = null;
|
||||
if (remoteUser != null) {
|
||||
callerUGI = UserGroupInformation.createRemoteUser(remoteUser);
|
||||
}
|
||||
|
||||
return callerUGI;
|
||||
}
|
||||
|
||||
public RMQueueAclInfo checkUserAccessToQueue(
|
||||
String queue, String username, String queueAclType, HttpServletRequest hsr)
|
||||
throws AuthorizationException {
|
||||
initForReadableEndpoints();
|
||||
|
||||
// For the user who invokes this REST call, he/she should have admin access
|
||||
// to the queue. Otherwise we will reject the call.
|
||||
UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
|
||||
if (callerUGI != null && !this.resourceManager.getResourceScheduler().checkAccess(
|
||||
callerUGI, QueueACL.ADMINISTER_QUEUE, queue)) {
|
||||
throw new ForbiddenException(
|
||||
"User=" + callerUGI.getUserName() + " doesn't haven access to queue="
|
||||
+ queue + " so it cannot check ACLs for other users.");
|
||||
}
|
||||
|
||||
// Create UGI for the to-be-checked user.
|
||||
UserGroupInformation user = UserGroupInformation.createRemoteUser(username);
|
||||
if (user == null) {
|
||||
throw new ForbiddenException(
|
||||
"Failed to retrieve UserGroupInformation for user=" + username);
|
||||
}
|
||||
|
||||
// Check if the specified queue acl is valid.
|
||||
QueueACL queueACL;
|
||||
try {
|
||||
queueACL = QueueACL.valueOf(queueAclType);
|
||||
} catch (IllegalArgumentException e) {
|
||||
throw new BadRequestException("Specified queueAclType=" + queueAclType
|
||||
+ " is not a valid type, valid queue acl types={"
|
||||
+ "SUBMIT_APPLICATIONS/ADMINISTER_QUEUE}");
|
||||
}
|
||||
|
||||
if (!this.resourceManager.getResourceScheduler().checkAccess(user, queueACL, queue)) {
|
||||
return new RMQueueAclInfo(false, user.getUserName(),
|
||||
"User=" + username + " doesn't have access to queue=" + queue
|
||||
+ " with acl-type=" + queueAclType);
|
||||
}
|
||||
|
||||
return new RMQueueAclInfo(true, user.getUserName(), "");
|
||||
}
|
||||
}
|
||||
}
|
@ -19,6 +19,7 @@
|
||||
package org.apache.hadoop.yarn.server.router.webapp;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.security.Principal;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.HashMap;
|
||||
@ -27,9 +28,11 @@
|
||||
import java.util.HashSet;
|
||||
import java.util.Collections;
|
||||
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.ws.rs.core.Response;
|
||||
import javax.ws.rs.core.Response.Status;
|
||||
|
||||
import org.apache.hadoop.security.authorize.AuthorizationException;
|
||||
import org.apache.hadoop.test.LambdaTestUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.http.HttpConfig;
|
||||
@ -43,8 +46,10 @@
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.QueueACL;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager;
|
||||
import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
||||
@ -81,6 +86,7 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppQueue;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationListInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.RMQueueAclInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.NodeIDsInfo;
|
||||
import org.apache.hadoop.yarn.server.router.webapp.cache.RouterAppInfoCacheKey;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationStatisticsInfo;
|
||||
@ -90,6 +96,7 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationRequestInfo;
|
||||
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
|
||||
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
|
||||
import org.apache.hadoop.yarn.server.router.webapp.dao.FederationRMQueueAclInfo;
|
||||
import org.apache.hadoop.yarn.util.LRUCacheHashMap;
|
||||
import org.apache.hadoop.yarn.util.MonotonicClock;
|
||||
import org.apache.hadoop.yarn.util.Times;
|
||||
@ -100,6 +107,8 @@
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.apache.hadoop.yarn.server.router.webapp.MockDefaultRequestInterceptorREST.QUEUE_DEDICATED_FULL;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
/**
|
||||
* Extends the {@code BaseRouterClientRMTest} and overrides methods in order to
|
||||
@ -1170,4 +1179,82 @@ public void testWebAddressWithScheme() {
|
||||
WebAppUtils.getHttpSchemePrefix(this.getConf()) + webAppAddress;
|
||||
Assert.assertEquals(expectedHttpsWebAddress, webAppAddressWithScheme2);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCheckUserAccessToQueue() throws Exception {
|
||||
|
||||
// Case 1: Only queue admin user can access other user's information
|
||||
HttpServletRequest mockHsr = mockHttpServletRequestByUserName("non-admin");
|
||||
String errorMsg1 = "User=non-admin doesn't haven access to queue=queue " +
|
||||
"so it cannot check ACLs for other users.";
|
||||
LambdaTestUtils.intercept(YarnRuntimeException.class, errorMsg1,
|
||||
() -> interceptor.checkUserAccessToQueue("queue", "jack",
|
||||
QueueACL.SUBMIT_APPLICATIONS.name(), mockHsr));
|
||||
|
||||
// Case 2: request an unknown ACL causes BAD_REQUEST
|
||||
HttpServletRequest mockHsr1 = mockHttpServletRequestByUserName("admin");
|
||||
String errorMsg2 = "Specified queueAclType=XYZ_ACL is not a valid type, " +
|
||||
"valid queue acl types={SUBMIT_APPLICATIONS/ADMINISTER_QUEUE}";
|
||||
LambdaTestUtils.intercept(YarnRuntimeException.class, errorMsg2,
|
||||
() -> interceptor.checkUserAccessToQueue("queue", "jack", "XYZ_ACL", mockHsr1));
|
||||
|
||||
// We design a test, admin user has ADMINISTER_QUEUE, SUBMIT_APPLICATIONS permissions,
|
||||
// yarn user has SUBMIT_APPLICATIONS permissions, other users have no permissions
|
||||
|
||||
// Case 3: get FORBIDDEN for rejected ACL
|
||||
checkUserAccessToQueueFailed("queue", "jack", QueueACL.SUBMIT_APPLICATIONS, "admin");
|
||||
checkUserAccessToQueueFailed("queue", "jack", QueueACL.ADMINISTER_QUEUE, "admin");
|
||||
|
||||
// Case 4: get OK for listed ACLs
|
||||
checkUserAccessToQueueSuccess("queue", "admin", QueueACL.ADMINISTER_QUEUE, "admin");
|
||||
checkUserAccessToQueueSuccess("queue", "admin", QueueACL.SUBMIT_APPLICATIONS, "admin");
|
||||
|
||||
// Case 5: get OK only for SUBMIT_APP acl for "yarn" user
|
||||
checkUserAccessToQueueFailed("queue", "yarn", QueueACL.ADMINISTER_QUEUE, "admin");
|
||||
checkUserAccessToQueueSuccess("queue", "yarn", QueueACL.SUBMIT_APPLICATIONS, "admin");
|
||||
}
|
||||
|
||||
private void checkUserAccessToQueueSuccess(String queue, String userName,
|
||||
QueueACL queueACL, String mockUser) throws AuthorizationException {
|
||||
HttpServletRequest mockHsr = mockHttpServletRequestByUserName(mockUser);
|
||||
RMQueueAclInfo aclInfo =
|
||||
interceptor.checkUserAccessToQueue(queue, userName, queueACL.name(), mockHsr);
|
||||
Assert.assertNotNull(aclInfo);
|
||||
Assert.assertTrue(aclInfo instanceof FederationRMQueueAclInfo);
|
||||
FederationRMQueueAclInfo fedAclInfo = FederationRMQueueAclInfo.class.cast(aclInfo);
|
||||
List<RMQueueAclInfo> aclInfos = fedAclInfo.getList();
|
||||
Assert.assertNotNull(aclInfos);
|
||||
Assert.assertEquals(4, aclInfos.size());
|
||||
for (RMQueueAclInfo rMQueueAclInfo : aclInfos) {
|
||||
Assert.assertTrue(rMQueueAclInfo.isAllowed());
|
||||
}
|
||||
}
|
||||
|
||||
private void checkUserAccessToQueueFailed(String queue, String userName,
|
||||
QueueACL queueACL, String mockUser) throws AuthorizationException {
|
||||
HttpServletRequest mockHsr = mockHttpServletRequestByUserName(mockUser);
|
||||
RMQueueAclInfo aclInfo =
|
||||
interceptor.checkUserAccessToQueue(queue, userName, queueACL.name(), mockHsr);
|
||||
Assert.assertNotNull(aclInfo);
|
||||
Assert.assertTrue(aclInfo instanceof FederationRMQueueAclInfo);
|
||||
FederationRMQueueAclInfo fedAclInfo = FederationRMQueueAclInfo.class.cast(aclInfo);
|
||||
List<RMQueueAclInfo> aclInfos = fedAclInfo.getList();
|
||||
Assert.assertNotNull(aclInfos);
|
||||
Assert.assertEquals(4, aclInfos.size());
|
||||
for (RMQueueAclInfo rMQueueAclInfo : aclInfos) {
|
||||
Assert.assertFalse(rMQueueAclInfo.isAllowed());
|
||||
String expectDiagnostics = "User=" + userName +
|
||||
" doesn't have access to queue=queue with acl-type=" + queueACL.name();
|
||||
Assert.assertEquals(expectDiagnostics, rMQueueAclInfo.getDiagnostics());
|
||||
}
|
||||
}
|
||||
|
||||
private HttpServletRequest mockHttpServletRequestByUserName(String username) {
|
||||
HttpServletRequest mockHsr = mock(HttpServletRequest.class);
|
||||
when(mockHsr.getRemoteUser()).thenReturn(username);
|
||||
Principal principal = mock(Principal.class);
|
||||
when(principal.getName()).thenReturn(username);
|
||||
when(mockHsr.getUserPrincipal()).thenReturn(principal);
|
||||
return mockHsr;
|
||||
}
|
||||
}
|
@ -25,9 +25,11 @@
|
||||
|
||||
import javax.ws.rs.core.Response;
|
||||
|
||||
import org.apache.hadoop.test.LambdaTestUtils;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
|
||||
import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager;
|
||||
import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
|
||||
@ -40,7 +42,6 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewApplication;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo;
|
||||
import org.apache.hadoop.yarn.server.router.clientrm.PassThroughClientRequestInterceptor;
|
||||
import org.apache.hadoop.yarn.server.router.clientrm.TestableFederationClientInterceptor;
|
||||
import org.apache.hadoop.yarn.webapp.NotFoundException;
|
||||
@ -377,15 +378,12 @@ public void testGetNodeOneBadOneGood()
|
||||
* composed of only 1 bad SubCluster.
|
||||
*/
|
||||
@Test
|
||||
public void testGetNodesOneBadSC()
|
||||
throws YarnException, IOException, InterruptedException {
|
||||
public void testGetNodesOneBadSC() throws Exception {
|
||||
|
||||
setupCluster(Arrays.asList(bad2));
|
||||
|
||||
NodesInfo response = interceptor.getNodes(null);
|
||||
Assert.assertNotNull(response);
|
||||
Assert.assertEquals(0, response.getNodes().size());
|
||||
// The remove duplicate operations is tested in TestRouterWebServiceUtil
|
||||
LambdaTestUtils.intercept(YarnRuntimeException.class, "RM is stopped",
|
||||
() -> interceptor.getNodes(null));
|
||||
}
|
||||
|
||||
/**
|
||||
@ -393,14 +391,12 @@ public void testGetNodesOneBadSC()
|
||||
* composed of only 2 bad SubClusters.
|
||||
*/
|
||||
@Test
|
||||
public void testGetNodesTwoBadSCs()
|
||||
throws YarnException, IOException, InterruptedException {
|
||||
public void testGetNodesTwoBadSCs() throws Exception {
|
||||
|
||||
setupCluster(Arrays.asList(bad1, bad2));
|
||||
|
||||
NodesInfo response = interceptor.getNodes(null);
|
||||
Assert.assertNotNull(response);
|
||||
Assert.assertEquals(0, response.getNodes().size());
|
||||
// The remove duplicate operations is tested in TestRouterWebServiceUtil
|
||||
LambdaTestUtils.intercept(YarnRuntimeException.class, "RM is stopped",
|
||||
() -> interceptor.getNodes(null));
|
||||
}
|
||||
|
||||
/**
|
||||
@ -408,17 +404,11 @@ public void testGetNodesTwoBadSCs()
|
||||
* composed of only 1 bad SubCluster and a good one.
|
||||
*/
|
||||
@Test
|
||||
public void testGetNodesOneBadOneGood()
|
||||
throws YarnException, IOException, InterruptedException {
|
||||
public void testGetNodesOneBadOneGood() throws Exception {
|
||||
setupCluster(Arrays.asList(good, bad2));
|
||||
|
||||
NodesInfo response = interceptor.getNodes(null);
|
||||
Assert.assertNotNull(response);
|
||||
Assert.assertEquals(1, response.getNodes().size());
|
||||
// Check if the only node came from Good SubCluster
|
||||
Assert.assertEquals(good.getId(),
|
||||
Long.toString(response.getNodes().get(0).getLastHealthUpdate()));
|
||||
// The remove duplicate operations is tested in TestRouterWebServiceUtil
|
||||
LambdaTestUtils.intercept(YarnRuntimeException.class, "RM is stopped",
|
||||
() -> interceptor.getNodes(null));
|
||||
}
|
||||
|
||||
/**
|
||||
|
Loading…
Reference in New Issue
Block a user