HDDS-29. Fix TestStorageContainerManager#testRpcPermission. Contributed by Mukul Kumar Singh.

This commit is contained in:
Xiaoyu Yao 2018-05-14 09:09:25 -07:00
parent 8a2b5914f3
commit fc5d49c202
3 changed files with 28 additions and 23 deletions

View File

@ -40,6 +40,7 @@
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ozone.protocolPB import org.apache.hadoop.ozone.protocolPB
.StorageContainerLocationProtocolServerSideTranslatorPB; .StorageContainerLocationProtocolServerSideTranslatorPB;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -137,17 +138,26 @@ public void join() throws InterruptedException {
getClientRpcServer().join(); getClientRpcServer().join();
} }
@VisibleForTesting
public String getRpcRemoteUsername() {
UserGroupInformation user = ProtobufRpcEngine.Server.getRemoteUser();
return user == null ? null : user.getUserName();
}
@Override @Override
public ContainerInfo allocateContainer(HddsProtos.ReplicationType public ContainerInfo allocateContainer(HddsProtos.ReplicationType
replicationType, HddsProtos.ReplicationFactor factor, replicationType, HddsProtos.ReplicationFactor factor,
String owner) throws IOException { String owner) throws IOException {
getScm().checkAdminAccess(); String remoteUser = getRpcRemoteUsername();
getScm().checkAdminAccess(remoteUser);
return scm.getScmContainerManager() return scm.getScmContainerManager()
.allocateContainer(replicationType, factor, owner); .allocateContainer(replicationType, factor, owner);
} }
@Override @Override
public ContainerInfo getContainer(long containerID) throws IOException { public ContainerInfo getContainer(long containerID) throws IOException {
String remoteUser = getRpcRemoteUsername();
getScm().checkAdminAccess(remoteUser);
return scm.getScmContainerManager() return scm.getScmContainerManager()
.getContainer(containerID); .getContainer(containerID);
} }
@ -161,7 +171,8 @@ public List<ContainerInfo> listContainer(long startContainerID,
@Override @Override
public void deleteContainer(long containerID) throws IOException { public void deleteContainer(long containerID) throws IOException {
getScm().checkAdminAccess(); String remoteUser = getRpcRemoteUsername();
getScm().checkAdminAccess(remoteUser);
scm.getScmContainerManager().deleteContainer(containerID); scm.getScmContainerManager().deleteContainer(containerID);
} }

View File

@ -620,14 +620,7 @@ public BlockManager getScmBlockManager() {
return scmBlockManager; return scmBlockManager;
} }
@VisibleForTesting public void checkAdminAccess(String remoteUser) throws IOException {
public String getPpcRemoteUsername() {
UserGroupInformation user = ProtobufRpcEngine.Server.getRemoteUser();
return user == null ? null : user.getUserName();
}
public void checkAdminAccess() throws IOException {
String remoteUser = getPpcRemoteUsername();
if (remoteUser != null) { if (remoteUser != null) {
if (!scmAdminUsernames.contains(remoteUser)) { if (!scmAdminUsernames.contains(remoteUser)) {
throw new IOException( throw new IOException(

View File

@ -23,6 +23,7 @@
import org.apache.commons.lang3.RandomUtils; import org.apache.commons.lang3.RandomUtils;
import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.server.SCMClientProtocolServer;
import org.apache.hadoop.hdds.scm.server.SCMStorage; import org.apache.hadoop.hdds.scm.server.SCMStorage;
import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.ozone.container.ContainerTestHelper; import org.apache.hadoop.ozone.container.ContainerTestHelper;
@ -107,19 +108,19 @@ private void testRpcPermissionWithConf(
MiniOzoneCluster cluster = MiniOzoneCluster.newBuilder(ozoneConf).build(); MiniOzoneCluster cluster = MiniOzoneCluster.newBuilder(ozoneConf).build();
cluster.waitForClusterToBeReady(); cluster.waitForClusterToBeReady();
try { try {
String fakeUser = fakeRemoteUsername;
StorageContainerManager mockScm = Mockito.spy( SCMClientProtocolServer mockClientServer = Mockito.spy(
cluster.getStorageContainerManager()); cluster.getStorageContainerManager().getClientProtocolServer());
Mockito.when(mockScm.getPpcRemoteUsername()) Mockito.when(mockClientServer.getRpcRemoteUsername())
.thenReturn(fakeUser); .thenReturn(fakeRemoteUsername);
try { try {
mockScm.getClientProtocolServer().deleteContainer( mockClientServer.deleteContainer(
ContainerTestHelper.getTestContainerID()); ContainerTestHelper.getTestContainerID());
fail("Operation should fail, expecting an IOException here."); fail("Operation should fail, expecting an IOException here.");
} catch (Exception e) { } catch (Exception e) {
if (expectPermissionDenied) { if (expectPermissionDenied) {
verifyPermissionDeniedException(e, fakeUser); verifyPermissionDeniedException(e, fakeRemoteUsername);
} else { } else {
// If passes permission check, it should fail with // If passes permission check, it should fail with
// container not exist exception. // container not exist exception.
@ -129,7 +130,7 @@ private void testRpcPermissionWithConf(
} }
try { try {
ContainerInfo container2 = mockScm.getClientProtocolServer() ContainerInfo container2 = mockClientServer
.allocateContainer(xceiverClientManager.getType(), .allocateContainer(xceiverClientManager.getType(),
HddsProtos.ReplicationFactor.ONE, "OZONE"); HddsProtos.ReplicationFactor.ONE, "OZONE");
if (expectPermissionDenied) { if (expectPermissionDenied) {
@ -138,11 +139,11 @@ private void testRpcPermissionWithConf(
Assert.assertEquals(1, container2.getPipeline().getMachines().size()); Assert.assertEquals(1, container2.getPipeline().getMachines().size());
} }
} catch (Exception e) { } catch (Exception e) {
verifyPermissionDeniedException(e, fakeUser); verifyPermissionDeniedException(e, fakeRemoteUsername);
} }
try { try {
ContainerInfo container3 = mockScm.getClientProtocolServer() ContainerInfo container3 = mockClientServer
.allocateContainer(xceiverClientManager.getType(), .allocateContainer(xceiverClientManager.getType(),
HddsProtos.ReplicationFactor.ONE, "OZONE"); HddsProtos.ReplicationFactor.ONE, "OZONE");
if (expectPermissionDenied) { if (expectPermissionDenied) {
@ -151,16 +152,16 @@ private void testRpcPermissionWithConf(
Assert.assertEquals(1, container3.getPipeline().getMachines().size()); Assert.assertEquals(1, container3.getPipeline().getMachines().size());
} }
} catch (Exception e) { } catch (Exception e) {
verifyPermissionDeniedException(e, fakeUser); verifyPermissionDeniedException(e, fakeRemoteUsername);
} }
try { try {
mockScm.getClientProtocolServer().getContainer( mockClientServer.getContainer(
ContainerTestHelper.getTestContainerID()); ContainerTestHelper.getTestContainerID());
fail("Operation should fail, expecting an IOException here."); fail("Operation should fail, expecting an IOException here.");
} catch (Exception e) { } catch (Exception e) {
if (expectPermissionDenied) { if (expectPermissionDenied) {
verifyPermissionDeniedException(e, fakeUser); verifyPermissionDeniedException(e, fakeRemoteUsername);
} else { } else {
// If passes permission check, it should fail with // If passes permission check, it should fail with
// key not exist exception. // key not exist exception.