YARN-11375. [Federation] Support refreshAdminAcls、refreshServiceAcls API's for Federation. (#5312)

This commit is contained in:
slfan1989 2023-03-01 06:44:00 +08:00 committed by GitHub
parent dcd9dc6983
commit bcc51ce2c5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 399 additions and 20 deletions

View File

@ -33,4 +33,27 @@ public static RefreshAdminAclsRequest newInstance() {
Records.newRecord(RefreshAdminAclsRequest.class);
return request;
}
@Public
@Stable
public static RefreshAdminAclsRequest newInstance(String subClusterId) {
RefreshAdminAclsRequest request =
Records.newRecord(RefreshAdminAclsRequest.class);
request.setSubClusterId(subClusterId);
return request;
}
/**
* Get the subClusterId.
*
* @return subClusterId.
*/
public abstract String getSubClusterId();
/**
* Set the subClusterId.
*
* @param subClusterId subCluster Id.
*/
public abstract void setSubClusterId(String subClusterId);
}

View File

@ -33,4 +33,27 @@ public static RefreshServiceAclsRequest newInstance() {
Records.newRecord(RefreshServiceAclsRequest.class);
return request;
}
@Public
@Stable
public static RefreshServiceAclsRequest newInstance(String subClusterId) {
RefreshServiceAclsRequest request =
Records.newRecord(RefreshServiceAclsRequest.class);
request.setSubClusterId(subClusterId);
return request;
}
/**
* Get the subClusterId.
*
* @return subClusterId.
*/
public abstract String getSubClusterId();
/**
* Set the subClusterId.
*
* @param subClusterId subCluster Id.
*/
public abstract void setSubClusterId(String subClusterId);
}

View File

@ -58,11 +58,13 @@ message RefreshUserToGroupsMappingsResponseProto {
}
message RefreshAdminAclsRequestProto {
optional string sub_cluster_id = 1;
}
message RefreshAdminAclsResponseProto {
}
message RefreshServiceAclsRequestProto {
optional string sub_cluster_id = 1;
}
message RefreshServiceAclsResponseProto {
}

View File

@ -18,21 +18,22 @@
package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshAdminAclsRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshAdminAclsRequestProtoOrBuilder;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
import org.apache.hadoop.thirdparty.protobuf.TextFormat;
@Private
@Unstable
public class RefreshAdminAclsRequestPBImpl
extends RefreshAdminAclsRequest {
public class RefreshAdminAclsRequestPBImpl extends RefreshAdminAclsRequest {
RefreshAdminAclsRequestProto proto = RefreshAdminAclsRequestProto.getDefaultInstance();
RefreshAdminAclsRequestProto.Builder builder = null;
boolean viaProto = false;
private RefreshAdminAclsRequestProto proto = RefreshAdminAclsRequestProto.getDefaultInstance();
private RefreshAdminAclsRequestProto.Builder builder = null;
private boolean viaProto = false;
public RefreshAdminAclsRequestPBImpl() {
builder = RefreshAdminAclsRequestProto.newBuilder();
@ -48,6 +49,13 @@ public RefreshAdminAclsRequestProto getProto() {
viaProto = true;
return proto;
}
private synchronized void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = RefreshAdminAclsRequestProto.newBuilder(proto);
}
viaProto = false;
}
@Override
public int hashCode() {
@ -56,16 +64,39 @@ public int hashCode() {
@Override
public boolean equals(Object other) {
if (other == null)
if (!(other instanceof RefreshAdminAclsRequest)) {
return false;
if (other.getClass().isAssignableFrom(this.getClass())) {
return this.getProto().equals(this.getClass().cast(other).getProto());
}
return false;
RefreshAdminAclsRequestPBImpl otherImpl = this.getClass().cast(other);
return new EqualsBuilder()
.append(this.getProto(), otherImpl.getProto())
.isEquals();
}
@Override
public String toString() {
return TextFormat.shortDebugString(getProto());
}
@Override
public String getSubClusterId() {
RefreshAdminAclsRequestProtoOrBuilder p = viaProto ? proto : builder;
boolean hasSubClusterId = p.hasSubClusterId();
if (hasSubClusterId) {
return p.getSubClusterId();
}
return null;
}
@Override
public void setSubClusterId(String subClusterId) {
maybeInitBuilder();
if (subClusterId == null) {
builder.clearSubClusterId();
return;
}
builder.setSubClusterId(subClusterId);
}
}

View File

@ -18,9 +18,11 @@
package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshServiceAclsRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshServiceAclsRequestProtoOrBuilder;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsRequest;
import org.apache.hadoop.thirdparty.protobuf.TextFormat;
@ -29,10 +31,10 @@
@Unstable
public class RefreshServiceAclsRequestPBImpl extends RefreshServiceAclsRequest {
RefreshServiceAclsRequestProto proto =
private RefreshServiceAclsRequestProto proto =
RefreshServiceAclsRequestProto.getDefaultInstance();
RefreshServiceAclsRequestProto.Builder builder = null;
boolean viaProto = false;
private RefreshServiceAclsRequestProto.Builder builder = null;
private boolean viaProto = false;
public RefreshServiceAclsRequestPBImpl() {
builder = RefreshServiceAclsRequestProto.newBuilder();
@ -50,6 +52,13 @@ public RefreshServiceAclsRequestProto getProto() {
return proto;
}
private synchronized void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = RefreshServiceAclsRequestProto.newBuilder(proto);
}
viaProto = false;
}
@Override
public int hashCode() {
return getProto().hashCode();
@ -57,16 +66,39 @@ public int hashCode() {
@Override
public boolean equals(Object other) {
if (other == null)
if (!(other instanceof RefreshServiceAclsRequest)) {
return false;
if (other.getClass().isAssignableFrom(this.getClass())) {
return this.getProto().equals(this.getClass().cast(other).getProto());
}
return false;
RefreshServiceAclsRequestPBImpl otherImpl = this.getClass().cast(other);
return new EqualsBuilder()
.append(this.getProto(), otherImpl.getProto())
.isEquals();
}
@Override
public String toString() {
return TextFormat.shortDebugString(getProto());
}
@Override
public String getSubClusterId() {
RefreshServiceAclsRequestProtoOrBuilder p = viaProto ? proto : builder;
boolean hasSubClusterId = p.hasSubClusterId();
if (hasSubClusterId) {
return p.getSubClusterId();
}
return null;
}
@Override
public void setSubClusterId(String subClusterId) {
maybeInitBuilder();
if (subClusterId == null) {
builder.clearSubClusterId();
return;
}
builder.setSubClusterId(subClusterId);
}
}

View File

@ -22,6 +22,7 @@
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.test.GenericTestUtils;
@ -55,8 +56,13 @@
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsResponse;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
@ -789,6 +795,7 @@ protected void serviceStop() {
@Override
protected AdminService createAdminService() {
RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
return new AdminService(this) {
@Override
protected void startServer() {
@ -799,6 +806,19 @@ protected void startServer() {
protected void stopServer() {
// don't do anything
}
@Override
public RefreshServiceAclsResponse refreshServiceAcls(RefreshServiceAclsRequest request)
throws YarnException, IOException {
Configuration config = this.getConfig();
boolean authorization =
config.getBoolean(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, false);
if (!authorization) {
throw RPCUtil.getRemoteException(new IOException("Service Authorization (" +
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION + ") not enabled."));
}
return recordFactory.newRecordInstance(RefreshServiceAclsResponse.class);
}
};
}

View File

@ -147,6 +147,10 @@ public final class RouterMetrics {
private MutableGaugeInt numRefreshSuperUserGroupsConfigurationFailedRetrieved;
@Metric("# of refreshUserToGroupsMappings failed to be retrieved")
private MutableGaugeInt numRefreshUserToGroupsMappingsFailedRetrieved;
@Metric("# of refreshAdminAcls failed to be retrieved")
private MutableGaugeInt numRefreshAdminAclsFailedRetrieved;
@Metric("# of refreshServiceAcls failed to be retrieved")
private MutableGaugeInt numRefreshServiceAclsFailedRetrieved;
@Metric("# of replaceLabelsOnNodes failed to be retrieved")
private MutableGaugeInt numReplaceLabelsOnNodesFailedRetrieved;
@Metric("# of replaceLabelsOnNode failed to be retrieved")
@ -267,6 +271,10 @@ public final class RouterMetrics {
private MutableRate totalSucceededReplaceLabelsOnNodeRetrieved;
@Metric("Total number of successful Retrieved GetSchedulerInfo and latency(ms)")
private MutableRate totalSucceededGetSchedulerInfoRetrieved;
@Metric("Total number of successful Retrieved RefreshAdminAcls and latency(ms)")
private MutableRate totalSucceededRefreshAdminAclsRetrieved;
@Metric("Total number of successful Retrieved RefreshServiceAcls and latency(ms)")
private MutableRate totalSucceededRefreshServiceAclsRetrieved;
@Metric("Total number of successful Retrieved AddToClusterNodeLabels and latency(ms)")
private MutableRate totalSucceededAddToClusterNodeLabelsRetrieved;
@Metric("Total number of successful Retrieved RemoveFromClusterNodeLabels and latency(ms)")
@ -328,6 +336,8 @@ public final class RouterMetrics {
private MutableQuantiles getSchedulerInfoRetrievedLatency;
private MutableQuantiles refreshSuperUserGroupsConfLatency;
private MutableQuantiles refreshUserToGroupsMappingsLatency;
private MutableQuantiles refreshAdminAclsLatency;
private MutableQuantiles refreshServiceAclsLatency;
private MutableQuantiles replaceLabelsOnNodesLatency;
private MutableQuantiles replaceLabelsOnNodeLatency;
private MutableQuantiles addToClusterNodeLabelsLatency;
@ -524,6 +534,12 @@ private RouterMetrics() {
refreshUserToGroupsMappingsLatency = registry.newQuantiles("refreshUserToGroupsMappingsLatency",
"latency of refresh user to groups mappings timeouts", "ops", "latency", 10);
refreshAdminAclsLatency = registry.newQuantiles("refreshAdminAclsLatency",
"latency of refresh admin acls timeouts", "ops", "latency", 10);
refreshServiceAclsLatency = registry.newQuantiles("refreshServiceAclsLatency",
"latency of refresh service acls timeouts", "ops", "latency", 10);
replaceLabelsOnNodesLatency = registry.newQuantiles("replaceLabelsOnNodesLatency",
"latency of replace labels on nodes timeouts", "ops", "latency", 10);
@ -811,6 +827,16 @@ public long getNumSucceededGetSchedulerInfoRetrieved() {
return totalSucceededGetSchedulerInfoRetrieved.lastStat().numSamples();
}
@VisibleForTesting
public long getNumSucceededRefreshAdminAclsRetrieved() {
return totalSucceededRefreshAdminAclsRetrieved.lastStat().numSamples();
}
@VisibleForTesting
public long getNumSucceededRefreshServiceAclsRetrieved() {
return totalSucceededRefreshServiceAclsRetrieved.lastStat().numSamples();
}
@VisibleForTesting
public long getNumSucceededAddToClusterNodeLabelsRetrieved() {
return totalSucceededAddToClusterNodeLabelsRetrieved.lastStat().numSamples();
@ -1091,6 +1117,16 @@ public double getLatencySucceededGetSchedulerInfoRetrieved() {
return totalSucceededGetSchedulerInfoRetrieved.lastStat().mean();
}
@VisibleForTesting
public double getLatencySucceededRefreshAdminAclsRetrieved() {
return totalSucceededRefreshAdminAclsRetrieved.lastStat().mean();
}
@VisibleForTesting
public double getLatencySucceededRefreshServiceAclsRetrieved() {
return totalSucceededRefreshServiceAclsRetrieved.lastStat().mean();
}
@VisibleForTesting
public double getLatencySucceededAddToClusterNodeLabelsRetrieved() {
return totalSucceededAddToClusterNodeLabelsRetrieved.lastStat().mean();
@ -1322,6 +1358,14 @@ public int getNumRefreshUserToGroupsMappingsFailedRetrieved() {
return numRefreshUserToGroupsMappingsFailedRetrieved.value();
}
public int getNumRefreshAdminAclsFailedRetrieved() {
return numRefreshAdminAclsFailedRetrieved.value();
}
public int getNumRefreshServiceAclsFailedRetrieved() {
return numRefreshServiceAclsFailedRetrieved.value();
}
public int getNumReplaceLabelsOnNodesFailedRetrieved() {
return numReplaceLabelsOnNodesFailedRetrieved.value();
}
@ -1621,6 +1665,16 @@ public void succeededGetSchedulerInfoRetrieved(long duration) {
getSchedulerInfoRetrievedLatency.add(duration);
}
public void succeededRefreshAdminAclsRetrieved(long duration) {
totalSucceededRefreshAdminAclsRetrieved.add(duration);
refreshAdminAclsLatency.add(duration);
}
public void succeededRefreshServiceAclsRetrieved(long duration) {
totalSucceededRefreshServiceAclsRetrieved.add(duration);
refreshServiceAclsLatency.add(duration);
}
public void succeededAddToClusterNodeLabelsRetrieved(long duration) {
totalSucceededAddToClusterNodeLabelsRetrieved.add(duration);
addToClusterNodeLabelsLatency.add(duration);
@ -1835,6 +1889,14 @@ public void incrRefreshUserToGroupsMappingsFailedRetrieved() {
numRefreshUserToGroupsMappingsFailedRetrieved.incr();
}
public void incrRefreshAdminAclsFailedRetrieved() {
numRefreshAdminAclsFailedRetrieved.incr();
}
public void incrRefreshServiceAclsFailedRetrieved() {
numRefreshServiceAclsFailedRetrieved.incr();
}
public void incrAddToClusterNodeLabelsFailedRetrieved() {
numAddToClusterNodeLabelsFailedRetrieved.incr();
}

View File

@ -372,13 +372,67 @@ public RefreshUserToGroupsMappingsResponse refreshUserToGroupsMappings(
@Override
public RefreshAdminAclsResponse refreshAdminAcls(RefreshAdminAclsRequest request)
throws YarnException, IOException {
throw new NotImplementedException();
// parameter verification.
if (request == null) {
routerMetrics.incrRefreshAdminAclsFailedRetrieved();
RouterServerUtil.logAndThrowException("Missing RefreshAdminAcls request.", null);
}
// call refreshAdminAcls of activeSubClusters.
try {
long startTime = clock.getTime();
RMAdminProtocolMethod remoteMethod = new RMAdminProtocolMethod(
new Class[] {RefreshAdminAclsRequest.class}, new Object[] {request});
String subClusterId = request.getSubClusterId();
Collection<RefreshAdminAclsResponse> refreshAdminAclsResps =
remoteMethod.invokeConcurrent(this, RefreshAdminAclsResponse.class, subClusterId);
if (CollectionUtils.isNotEmpty(refreshAdminAclsResps)) {
long stopTime = clock.getTime();
routerMetrics.succeededRefreshAdminAclsRetrieved(stopTime - startTime);
return RefreshAdminAclsResponse.newInstance();
}
} catch (YarnException e) {
routerMetrics.incrRefreshAdminAclsFailedRetrieved();
RouterServerUtil.logAndThrowException(e,
"Unable to refreshAdminAcls due to exception. " + e.getMessage());
}
routerMetrics.incrRefreshAdminAclsFailedRetrieved();
throw new YarnException("Unable to refreshAdminAcls.");
}
@Override
public RefreshServiceAclsResponse refreshServiceAcls(RefreshServiceAclsRequest request)
throws YarnException, IOException {
throw new NotImplementedException();
// parameter verification.
if (request == null) {
routerMetrics.incrRefreshServiceAclsFailedRetrieved();
RouterServerUtil.logAndThrowException("Missing RefreshServiceAcls request.", null);
}
// call refreshAdminAcls of activeSubClusters.
try {
long startTime = clock.getTime();
RMAdminProtocolMethod remoteMethod = new RMAdminProtocolMethod(
new Class[]{RefreshServiceAclsRequest.class}, new Object[]{request});
String subClusterId = request.getSubClusterId();
Collection<RefreshServiceAclsResponse> refreshServiceAclsResps =
remoteMethod.invokeConcurrent(this, RefreshServiceAclsResponse.class, subClusterId);
if (CollectionUtils.isNotEmpty(refreshServiceAclsResps)) {
long stopTime = clock.getTime();
routerMetrics.succeededRefreshServiceAclsRetrieved(stopTime - startTime);
return RefreshServiceAclsResponse.newInstance();
}
} catch (YarnException e) {
routerMetrics.incrRefreshServiceAclsFailedRetrieved();
RouterServerUtil.logAndThrowException(e,
"Unable to refreshAdminAcls due to exception. " + e.getMessage());
}
routerMetrics.incrRefreshServiceAclsFailedRetrieved();
throw new YarnException("Unable to refreshServiceAcls.");
}
@Override

View File

@ -534,6 +534,16 @@ public void getRenewDelegationTokenFailed() {
metrics.incrRenewDelegationTokenFailedRetrieved();
}
public void getRefreshAdminAclsFailedRetrieved() {
LOG.info("Mocked: failed refreshAdminAcls call");
metrics.incrRefreshAdminAclsFailedRetrieved();
}
public void getRefreshServiceAclsFailedRetrieved() {
LOG.info("Mocked: failed refreshServiceAcls call");
metrics.incrRefreshServiceAclsFailedRetrieved();
}
public void getReplaceLabelsOnNodesFailed() {
LOG.info("Mocked: failed replaceLabelsOnNodes call");
metrics.incrReplaceLabelsOnNodesFailedRetrieved();
@ -789,6 +799,16 @@ public void getRenewDelegationTokenRetrieved(long duration) {
metrics.succeededRenewDelegationTokenRetrieved(duration);
}
public void getRefreshAdminAclsRetrieved(long duration) {
LOG.info("Mocked: successful RefreshAdminAcls call with duration {}", duration);
metrics.succeededRefreshAdminAclsRetrieved(duration);
}
public void getRefreshServiceAclsRetrieved(long duration) {
LOG.info("Mocked: successful RefreshServiceAcls call with duration {}", duration);
metrics.succeededRefreshServiceAclsRetrieved(duration);
}
public void getNumSucceededReplaceLabelsOnNodesRetrieved(long duration) {
LOG.info("Mocked: successful ReplaceLabelsOnNodes call with duration {}", duration);
metrics.succeededReplaceLabelsOnNodesRetrieved(duration);
@ -1653,6 +1673,52 @@ public void testRenewDelegationTokenRetrievedFailed() {
metrics.getRenewDelegationTokenFailedRetrieved());
}
@Test
public void testRefreshAdminAclsRetrieved() {
long totalGoodBefore = metrics.getNumSucceededRefreshAdminAclsRetrieved();
goodSubCluster.getRefreshAdminAclsRetrieved(150);
Assert.assertEquals(totalGoodBefore + 1,
metrics.getNumSucceededRefreshAdminAclsRetrieved());
Assert.assertEquals(150,
metrics.getLatencySucceededRefreshAdminAclsRetrieved(), ASSERT_DOUBLE_DELTA);
goodSubCluster.getRefreshAdminAclsRetrieved(300);
Assert.assertEquals(totalGoodBefore + 2,
metrics.getNumSucceededRefreshAdminAclsRetrieved());
Assert.assertEquals(225,
metrics.getLatencySucceededRefreshAdminAclsRetrieved(), ASSERT_DOUBLE_DELTA);
}
@Test
public void testRefreshAdminAclsRetrievedFailed() {
long totalBadBefore = metrics.getNumRefreshAdminAclsFailedRetrieved();
badSubCluster.getRefreshAdminAclsFailedRetrieved();
Assert.assertEquals(totalBadBefore + 1,
metrics.getNumRefreshAdminAclsFailedRetrieved());
}
@Test
public void testRefreshServiceAclsRetrieved() {
long totalGoodBefore = metrics.getNumSucceededRefreshServiceAclsRetrieved();
goodSubCluster.getRefreshServiceAclsRetrieved(150);
Assert.assertEquals(totalGoodBefore + 1,
metrics.getNumSucceededRefreshServiceAclsRetrieved());
Assert.assertEquals(150,
metrics.getLatencySucceededRefreshServiceAclsRetrieved(), ASSERT_DOUBLE_DELTA);
goodSubCluster.getRefreshServiceAclsRetrieved(300);
Assert.assertEquals(totalGoodBefore + 2,
metrics.getNumSucceededRefreshServiceAclsRetrieved());
Assert.assertEquals(225,
metrics.getLatencySucceededRefreshServiceAclsRetrieved(), ASSERT_DOUBLE_DELTA);
}
@Test
public void testRefreshServiceAclsRetrievedFailed() {
long totalBadBefore = metrics.getNumRefreshServiceAclsFailedRetrieved();
badSubCluster.getRefreshServiceAclsFailedRetrieved();
Assert.assertEquals(totalBadBefore + 1,
metrics.getNumRefreshServiceAclsFailedRetrieved());
}
@Test
public void testReplaceLabelsOnNodesRetrieved() {
long totalGoodBefore = metrics.getNumSucceededReplaceLabelsOnNodesRetrieved();

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.router.rmadmin;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.yarn.api.records.DecommissionType;
@ -30,6 +31,10 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsResponse;
import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
@ -113,6 +118,8 @@ protected YarnConfiguration createConfiguration() {
config.set(YarnConfiguration.ROUTER_RMADMIN_INTERCEPTOR_CLASS_PIPELINE,
mockPassThroughInterceptorClass + "," + mockPassThroughInterceptorClass + "," +
TestFederationRMAdminInterceptor.class.getName());
config.setBoolean(
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, true);
return config;
}
@ -259,4 +266,58 @@ public void testSC1RefreshUserToGroupsMappings() throws Exception {
"subClusterId = SC-NON is not an active subCluster.",
() -> interceptor.refreshUserToGroupsMappings(request1));
}
@Test
public void testRefreshAdminAcls() throws Exception {
// null request.
LambdaTestUtils.intercept(YarnException.class, "Missing RefreshAdminAcls request.",
() -> interceptor.refreshAdminAcls(null));
// normal request.
RefreshAdminAclsRequest request = RefreshAdminAclsRequest.newInstance();
RefreshAdminAclsResponse response = interceptor.refreshAdminAcls(request);
assertNotNull(response);
}
@Test
public void testSC1RefreshAdminAcls() throws Exception {
// case 1, test the existing subCluster (SC-1).
String existSubCluster = "SC-1";
RefreshAdminAclsRequest request = RefreshAdminAclsRequest.newInstance(existSubCluster);
RefreshAdminAclsResponse response = interceptor.refreshAdminAcls(request);
assertNotNull(response);
// case 2, test the non-exist subCluster.
String notExistsSubCluster = "SC-NON";
RefreshAdminAclsRequest request1 = RefreshAdminAclsRequest.newInstance(notExistsSubCluster);
LambdaTestUtils.intercept(Exception.class, "subClusterId = SC-NON is not an active subCluster.",
() -> interceptor.refreshAdminAcls(request1));
}
@Test
public void testRefreshServiceAcls() throws Exception {
// null request.
LambdaTestUtils.intercept(YarnException.class, "Missing RefreshServiceAcls request.",
() -> interceptor.refreshServiceAcls(null));
// normal request.
RefreshServiceAclsRequest request = RefreshServiceAclsRequest.newInstance();
RefreshServiceAclsResponse response = interceptor.refreshServiceAcls(request);
assertNotNull(response);
}
@Test
public void testSC1RefreshServiceAcls() throws Exception {
// case 1, test the existing subCluster (SC-1).
String existSubCluster = "SC-1";
RefreshServiceAclsRequest request = RefreshServiceAclsRequest.newInstance(existSubCluster);
RefreshServiceAclsResponse response = interceptor.refreshServiceAcls(request);
assertNotNull(response);
// case 2, test the non-exist subCluster.
String notExistsSubCluster = "SC-NON";
RefreshServiceAclsRequest request1 = RefreshServiceAclsRequest.newInstance(notExistsSubCluster);
LambdaTestUtils.intercept(Exception.class, "subClusterId = SC-NON is not an active subCluster.",
() -> interceptor.refreshServiceAcls(request1));
}
}

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.router.rmadmin;
import org.apache.commons.collections.MapUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
@ -35,6 +36,8 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.RM_CLUSTER_ID;
public class TestableFederationRMAdminInterceptor extends FederationRMAdminInterceptor {
// Record log information
@ -55,11 +58,13 @@ protected ResourceManagerAdministrationProtocol getAdminRMProxyForSubCluster(
if (mockRMs.containsKey(subClusterId)) {
mockRM = mockRMs.get(subClusterId);
} else {
mockRM = new MockRM();
YarnConfiguration config = new YarnConfiguration(super.getConf());
config.set(RM_CLUSTER_ID, "subcluster." + subClusterId);
mockRM = new MockRM(config);
if (badSubCluster.contains(subClusterId)) {
return new MockRMAdminBadService(mockRM);
}
mockRM.init(super.getConf());
mockRM.init(config);
mockRM.start();
mockRMs.put(subClusterId, mockRM);
}