getApplicationReport call may raise NPE for removed queues. (Jian He via wangda)
This commit is contained in:
parent
c9bb96fa81
commit
23248f63aa
@ -305,9 +305,7 @@ private boolean checkAccess(UserGroupInformation callerUGI, String owner,
|
||||
return applicationsACLsManager
|
||||
.checkAccess(callerUGI, operationPerformed, owner,
|
||||
application.getApplicationId()) || queueACLsManager
|
||||
.checkAccess(callerUGI, QueueACL.ADMINISTER_QUEUE,
|
||||
application.getQueue(), application.getApplicationId(),
|
||||
application.getName());
|
||||
.checkAccess(callerUGI, QueueACL.ADMINISTER_QUEUE, application);
|
||||
}
|
||||
|
||||
ApplicationId getNewApplicationId() {
|
||||
|
@ -18,20 +18,27 @@
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.security;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.QueueACL;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.security.AccessRequest;
|
||||
import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||
|
||||
public class QueueACLsManager {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(QueueACLsManager.class);
|
||||
|
||||
private ResourceScheduler scheduler;
|
||||
private boolean isACLsEnable;
|
||||
private YarnAuthorizationProvider authorizer;
|
||||
@ -49,17 +56,28 @@ public QueueACLsManager(ResourceScheduler scheduler, Configuration conf) {
|
||||
}
|
||||
|
||||
public boolean checkAccess(UserGroupInformation callerUGI, QueueACL acl,
|
||||
String queueName, ApplicationId appId, String appName) {
|
||||
RMApp app) {
|
||||
if (!isACLsEnable) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (scheduler instanceof CapacityScheduler) {
|
||||
return authorizer.checkPermission(new AccessRequest(
|
||||
((CapacityScheduler) scheduler).getQueue(queueName)
|
||||
.getPrivilegedEntity(), callerUGI,
|
||||
SchedulerUtils.toAccessType(acl), appId.toString(), appName));
|
||||
CSQueue queue = ((CapacityScheduler) scheduler).getQueue(app.getQueue());
|
||||
if (queue == null) {
|
||||
// Application exists but the associated queue does not exist.
|
||||
// This may be due to queue is removed after RM restarts. Here, we choose
|
||||
// to allow users to be able to view the apps for removed queue.
|
||||
LOG.error("Queue " + app.getQueue() + " does not exist for " + app
|
||||
.getApplicationId());
|
||||
return true;
|
||||
}
|
||||
|
||||
return authorizer.checkPermission(
|
||||
new AccessRequest(queue.getPrivilegedEntity(), callerUGI,
|
||||
SchedulerUtils.toAccessType(acl),
|
||||
app.getApplicationId().toString(), app.getName()));
|
||||
} else {
|
||||
return scheduler.checkAccess(callerUGI, acl, queueName);
|
||||
return scheduler.checkAccess(callerUGI, acl, app.getQueue());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -232,8 +232,7 @@ protected Boolean hasAccess(RMApp app, HttpServletRequest hsr) {
|
||||
ApplicationAccessType.VIEW_APP, app.getUser(),
|
||||
app.getApplicationId()) ||
|
||||
this.rm.getQueueACLsManager().checkAccess(callerUGI,
|
||||
QueueACL.ADMINISTER_QUEUE, app.getQueue(),
|
||||
app.getApplicationId(), app.getName()))) {
|
||||
QueueACL.ADMINISTER_QUEUE, app))) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
|
@ -21,7 +21,6 @@
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyString;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
@ -30,6 +29,7 @@
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.junit.Assert;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
@ -112,8 +112,7 @@ protected QueueACLsManager createQueueACLsManager(
|
||||
Configuration conf) {
|
||||
QueueACLsManager mockQueueACLsManager = mock(QueueACLsManager.class);
|
||||
when(mockQueueACLsManager.checkAccess(any(UserGroupInformation.class),
|
||||
any(QueueACL.class), anyString(), any(ApplicationId.class),
|
||||
anyString())).thenAnswer(new Answer() {
|
||||
any(QueueACL.class), any(RMApp.class))).thenAnswer(new Answer() {
|
||||
public Object answer(InvocationOnMock invocation) {
|
||||
return isQueueUser;
|
||||
}
|
||||
|
@ -473,8 +473,7 @@ public void handle(Event event) {
|
||||
QueueACLsManager mockQueueACLsManager = mock(QueueACLsManager.class);
|
||||
when(
|
||||
mockQueueACLsManager.checkAccess(any(UserGroupInformation.class),
|
||||
any(QueueACL.class), anyString(), any(ApplicationId.class),
|
||||
anyString())).thenReturn(true);
|
||||
any(QueueACL.class), any(RMApp.class))).thenReturn(true);
|
||||
return new ClientRMService(rmContext, yarnScheduler, appManager,
|
||||
mockAclsManager, mockQueueACLsManager, null);
|
||||
}
|
||||
@ -575,8 +574,7 @@ public void testGetQueueInfo() throws Exception {
|
||||
ApplicationACLsManager mockAclsManager = mock(ApplicationACLsManager.class);
|
||||
QueueACLsManager mockQueueACLsManager = mock(QueueACLsManager.class);
|
||||
when(mockQueueACLsManager.checkAccess(any(UserGroupInformation.class),
|
||||
any(QueueACL.class), anyString(), any(ApplicationId.class),
|
||||
anyString())).thenReturn(true);
|
||||
any(QueueACL.class), any(RMApp.class))).thenReturn(true);
|
||||
when(mockAclsManager.checkAccess(any(UserGroupInformation.class),
|
||||
any(ApplicationAccessType.class), anyString(),
|
||||
any(ApplicationId.class))).thenReturn(true);
|
||||
@ -602,8 +600,7 @@ public void testGetQueueInfo() throws Exception {
|
||||
QueueACLsManager mockQueueACLsManager1 =
|
||||
mock(QueueACLsManager.class);
|
||||
when(mockQueueACLsManager1.checkAccess(any(UserGroupInformation.class),
|
||||
any(QueueACL.class), anyString(), any(ApplicationId.class),
|
||||
anyString())).thenReturn(false);
|
||||
any(QueueACL.class), any(RMApp.class))).thenReturn(false);
|
||||
when(mockAclsManager1.checkAccess(any(UserGroupInformation.class),
|
||||
any(ApplicationAccessType.class), anyString(),
|
||||
any(ApplicationId.class))).thenReturn(false);
|
||||
@ -642,8 +639,7 @@ public void handle(Event event) {}
|
||||
|
||||
QueueACLsManager mockQueueACLsManager = mock(QueueACLsManager.class);
|
||||
when(mockQueueACLsManager.checkAccess(any(UserGroupInformation.class),
|
||||
any(QueueACL.class), anyString(), any(ApplicationId.class),
|
||||
anyString())).thenReturn(true);
|
||||
any(QueueACL.class), any(RMApp.class))).thenReturn(true);
|
||||
ClientRMService rmService =
|
||||
new ClientRMService(rmContext, yarnScheduler, appManager,
|
||||
mockAclsManager, mockQueueACLsManager, null);
|
||||
@ -731,8 +727,7 @@ public void handle(Event event) {}
|
||||
ApplicationACLsManager mockAclsManager = mock(ApplicationACLsManager.class);
|
||||
QueueACLsManager mockQueueACLsManager = mock(QueueACLsManager.class);
|
||||
when(mockQueueACLsManager.checkAccess(any(UserGroupInformation.class),
|
||||
any(QueueACL.class), anyString(), any(ApplicationId.class),
|
||||
anyString())).thenReturn(true);
|
||||
any(QueueACL.class), any(RMApp.class))).thenReturn(true);
|
||||
ClientRMService rmService =
|
||||
new ClientRMService(rmContext, yarnScheduler, appManager,
|
||||
mockAclsManager, mockQueueACLsManager, null);
|
||||
|
@ -18,23 +18,7 @@
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import com.google.common.base.Supplier;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||
@ -99,7 +83,22 @@
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import com.google.common.base.Supplier;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.net.UnknownHostException;
|
||||
import java.security.PrivilegedAction;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
@SuppressWarnings({"rawtypes", "unchecked"})
|
||||
public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase {
|
||||
@ -564,6 +563,43 @@ private void setupQueueConfigurationChildOfB(CapacitySchedulerConfiguration conf
|
||||
.MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT, 0.5f);
|
||||
}
|
||||
|
||||
// 1. submit an app to default queue and let it finish
|
||||
// 2. restart rm with no default queue
|
||||
// 3. getApplicationReport call should succeed (with no NPE)
|
||||
@Test (timeout = 30000)
|
||||
public void testRMRestartWithRemovedQueue() throws Exception{
|
||||
conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
|
||||
conf.set(YarnConfiguration.YARN_ADMIN_ACL, "");
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
rm1 = new MockRM(conf, memStore);
|
||||
rm1.start();
|
||||
MockNM nm1 =
|
||||
new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
|
||||
nm1.registerNode();
|
||||
final RMApp app1 = rm1.submitApp(1024, "app1", USER_1, null);
|
||||
MockAM am1 = MockRM.launchAndRegisterAM(app1,rm1, nm1);
|
||||
MockRM.finishAMAndVerifyAppState(app1, rm1, nm1, am1);
|
||||
|
||||
CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(conf);
|
||||
csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[]{QUEUE_DOESNT_EXIST});
|
||||
final String noQueue = CapacitySchedulerConfiguration.ROOT + "." + QUEUE_DOESNT_EXIST;
|
||||
csConf.setCapacity(noQueue, 100);
|
||||
rm2 = new MockRM(csConf,memStore);
|
||||
|
||||
rm2.start();
|
||||
UserGroupInformation user2 = UserGroupInformation.createRemoteUser("user2");
|
||||
|
||||
ApplicationReport report =
|
||||
user2.doAs(new PrivilegedExceptionAction<ApplicationReport>() {
|
||||
@Override
|
||||
public ApplicationReport run() throws Exception {
|
||||
return rm2.getApplicationReport(app1.getApplicationId());
|
||||
}
|
||||
});
|
||||
Assert.assertNotNull(report);
|
||||
}
|
||||
|
||||
// Test CS recovery with multi-level queues and multi-users:
|
||||
// 1. setup 2 NMs each with 8GB memory;
|
||||
// 2. setup 2 level queues: Default -> (QueueA, QueueB)
|
||||
|
Loading…
Reference in New Issue
Block a user