diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationMethodWrapper.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationMethodWrapper.java new file mode 100644 index 0000000000..9b8944049a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationMethodWrapper.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.federation.utils; + +import org.apache.hadoop.yarn.exceptions.YarnException; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; + + +public abstract class FederationMethodWrapper { + + /** + * List of parameters: static and dynamic values, matchings types. + */ + private Object[] params; + + /** + * List of method parameters types, matches parameters. + */ + private Class[] types; + + /** + * String name of the method. + */ + private String methodName; + + public FederationMethodWrapper(Class[] pTypes, Object... pParams) + throws IOException { + if (pParams.length != pTypes.length) { + throw new IOException("Invalid parameters for method."); + } + this.params = pParams; + this.types = Arrays.copyOf(pTypes, pTypes.length); + } + + public Object[] getParams() { + return Arrays.copyOf(this.params, this.params.length); + } + + public String getMethodName() { + return methodName; + } + + public void setMethodName(String methodName) { + this.methodName = methodName; + } + + /** + * Get the calling types for this method. + * + * @return An array of calling types. + */ + public Class[] getTypes() { + return Arrays.copyOf(this.types, this.types.length); + } + + protected abstract Collection invokeConcurrent(Class clazz) throws YarnException; +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java index a74b50b64f..b03aeda38b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java @@ -121,6 +121,8 @@ public final class RouterMetrics { private MutableGaugeInt numGetAppTimeoutFailedRetrieved; @Metric("# of getAppTimeouts failed to be retrieved") private MutableGaugeInt numGetAppTimeoutsFailedRetrieved; + @Metric("# of refreshQueues failed to be retrieved") + private MutableGaugeInt numRefreshQueuesFailedRetrieved; @Metric("# of getRMNodeLabels failed to be retrieved") private MutableGaugeInt numGetRMNodeLabelsFailedRetrieved; @Metric("# of checkUserAccessToQueue failed to be retrieved") @@ -207,6 +209,8 @@ public final class RouterMetrics { private MutableRate totalSucceededGetAppTimeoutRetrieved; @Metric("Total number of successful Retrieved GetAppTimeouts and latency(ms)") private MutableRate totalSucceededGetAppTimeoutsRetrieved; + @Metric("Total number of successful Retrieved RefreshQueues and latency(ms)") + private MutableRate totalSucceededRefreshQueuesRetrieved; @Metric("Total number of successful Retrieved GetRMNodeLabels and latency(ms)") private MutableRate totalSucceededGetRMNodeLabelsRetrieved; @Metric("Total number of successful Retrieved CheckUserAccessToQueue and latency(ms)") @@ -255,6 +259,7 @@ public final class RouterMetrics { private MutableQuantiles getUpdateQueueLatency; private MutableQuantiles getAppTimeoutLatency; private MutableQuantiles getAppTimeoutsLatency; + private MutableQuantiles getRefreshQueuesLatency; private MutableQuantiles getRMNodeLabelsLatency; private MutableQuantiles checkUserAccessToQueueLatency; @@ -410,6 +415,9 @@ private RouterMetrics() { getAppTimeoutsLatency = registry.newQuantiles("getAppTimeoutsLatency", "latency of get apptimeouts timeouts", "ops", "latency", 10); + getRefreshQueuesLatency = registry.newQuantiles("getRefreshQueuesLatency", + "latency of get refresh queues timeouts", "ops", "latency", 10); + getRMNodeLabelsLatency = registry.newQuantiles("getRMNodeLabelsLatency", "latency of get rmnodelabels timeouts", "ops", "latency", 10); @@ -636,6 +644,11 @@ public long getNumSucceededGetAppTimeoutsRetrieved() { return totalSucceededGetAppTimeoutsRetrieved.lastStat().numSamples(); } + @VisibleForTesting + public long getNumSucceededRefreshQueuesRetrieved() { + return totalSucceededRefreshQueuesRetrieved.lastStat().numSamples(); + } + @VisibleForTesting public long getNumSucceededGetRMNodeLabelsRetrieved() { return totalSucceededGetRMNodeLabelsRetrieved.lastStat().numSamples(); @@ -846,6 +859,11 @@ public double getLatencySucceededGetAppTimeoutsRetrieved() { return totalSucceededGetAppTimeoutsRetrieved.lastStat().mean(); } + @VisibleForTesting + public double getLatencySucceededRefreshQueuesRetrieved() { + return totalSucceededRefreshQueuesRetrieved.lastStat().mean(); + } + @VisibleForTesting public double getLatencySucceededGetRMNodeLabelsRetrieved() { return totalSucceededGetRMNodeLabelsRetrieved.lastStat().mean(); @@ -1037,6 +1055,11 @@ public int getAppTimeoutsFailedRetrieved() { return numGetAppTimeoutsFailedRetrieved.value(); } + + public int getRefreshQueuesFailedRetrieved() { + return numRefreshQueuesFailedRetrieved.value(); + } + public int getRMNodeLabelsFailedRetrieved() { return numGetRMNodeLabelsFailedRetrieved.value(); } @@ -1245,6 +1268,11 @@ public void succeededGetAppTimeoutsRetrieved(long duration) { getAppTimeoutsLatency.add(duration); } + public void succeededRefreshQueuesRetrieved(long duration) { + totalSucceededRefreshQueuesRetrieved.add(duration); + getRefreshQueuesLatency.add(duration); + } + public void succeededGetRMNodeLabelsRetrieved(long duration) { totalSucceededGetRMNodeLabelsRetrieved.add(duration); getRMNodeLabelsLatency.add(duration); @@ -1415,6 +1443,10 @@ public void incrGetAppTimeoutsFailedRetrieved() { numGetAppTimeoutsFailedRetrieved.incr(); } + public void incrRefreshQueuesFailedRetrieved() { + numRefreshQueuesFailedRetrieved.incr(); + } + public void incrGetRMNodeLabelsFailedRetrieved() { numGetRMNodeLabelsFailedRetrieved.incr(); } @@ -1422,4 +1454,4 @@ public void incrGetRMNodeLabelsFailedRetrieved() { public void incrCheckUserAccessToQueueFailedRetrieved() { numCheckUserAccessToQueueFailedRetrieved.incr(); } -} +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java index f55a45d4ba..6dd49daa4e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java @@ -25,6 +25,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -299,6 +300,28 @@ public static RuntimeException logAndReturnRunTimeException( return logAndReturnRunTimeException(null, errMsgFormat, args); } + /** + * Throws an YarnRuntimeException due to an error. + * + * @param t the throwable raised in the called class. + * @param errMsgFormat the error message format string. + * @param args referenced by the format specifiers in the format string. + * @return YarnRuntimeException + */ + @Public + @Unstable + public static YarnRuntimeException logAndReturnYarnRunTimeException( + Throwable t, String errMsgFormat, Object... args) { + String msg = String.format(errMsgFormat, args); + if (t != null) { + LOG.error(msg, t); + return new YarnRuntimeException(msg, t); + } else { + LOG.error(msg); + return new YarnRuntimeException(msg); + } + } + /** * Check applicationId is accurate. * @@ -491,4 +514,27 @@ public static SubClusterId getRandomActiveSubCluster( // Randomly choose a SubCluster return subClusterIds.get(rand.nextInt(subClusterIds.size())); } + + public static UserGroupInformation setupUser(final String userName) { + UserGroupInformation user = null; + try { + // If userName is empty, we will return UserGroupInformation.getCurrentUser. + // Do not create a proxy user if user name matches the user name on + // current UGI + if (userName == null || userName.trim().isEmpty()) { + user = UserGroupInformation.getCurrentUser(); + } else if (UserGroupInformation.isSecurityEnabled()) { + user = UserGroupInformation.createProxyUser(userName, UserGroupInformation.getLoginUser()); + } else if (userName.equalsIgnoreCase(UserGroupInformation.getCurrentUser().getUserName())) { + user = UserGroupInformation.getCurrentUser(); + } else { + user = UserGroupInformation.createProxyUser(userName, + UserGroupInformation.getCurrentUser()); + } + return user; + } catch (IOException e) { + throw RouterServerUtil.logAndReturnYarnRunTimeException(e, + "Error while creating Router RMAdmin Service for user : %s.", user); + } + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/AbstractClientRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/AbstractClientRequestInterceptor.java index c71be3c46f..10ed71b601 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/AbstractClientRequestInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/AbstractClientRequestInterceptor.java @@ -18,11 +18,9 @@ package org.apache.hadoop.yarn.server.router.clientrm; -import java.io.IOException; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.router.RouterServerUtil; import org.apache.hadoop.yarn.server.router.security.RouterDelegationTokenSecretManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -80,7 +78,7 @@ public Configuration getConf() { */ @Override public void init(String userName) { - setupUser(userName); + this.user = RouterServerUtil.setupUser(userName); if (this.nextInterceptor != null) { this.nextInterceptor.init(userName); } @@ -104,30 +102,6 @@ public ClientRequestInterceptor getNextInterceptor() { return this.nextInterceptor; } - private void setupUser(String userName) { - - try { - // Do not create a proxy user if user name matches the user name on - // current UGI - if (UserGroupInformation.isSecurityEnabled()) { - user = UserGroupInformation.createProxyUser(userName, UserGroupInformation.getLoginUser()); - } else if (userName.equalsIgnoreCase(UserGroupInformation.getCurrentUser().getUserName())) { - user = UserGroupInformation.getCurrentUser(); - } else { - user = UserGroupInformation.createProxyUser(userName, - UserGroupInformation.getCurrentUser()); - } - } catch (IOException e) { - String message = "Error while creating Router ClientRM Service for user:"; - if (user != null) { - message += ", user: " + user; - } - - LOG.info(message); - throw new YarnRuntimeException(message, e); - } - } - @Override public RouterDelegationTokenSecretManager getTokenSecretManager() { return tokenSecretManager; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/AbstractRMAdminRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/AbstractRMAdminRequestInterceptor.java index f789aa2b47..8b09d69971 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/AbstractRMAdminRequestInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/AbstractRMAdminRequestInterceptor.java @@ -19,6 +19,8 @@ package org.apache.hadoop.yarn.server.router.rmadmin; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.server.router.RouterServerUtil; /** * Implements the {@link RMAdminRequestInterceptor} interface and provides @@ -31,6 +33,9 @@ public abstract class AbstractRMAdminRequestInterceptor private Configuration conf; private RMAdminRequestInterceptor nextInterceptor; + @SuppressWarnings("checkstyle:visibilitymodifier") + protected UserGroupInformation user = null; + /** * Sets the {@link RMAdminRequestInterceptor} in the chain. */ @@ -63,9 +68,10 @@ public Configuration getConf() { * Initializes the {@link RMAdminRequestInterceptor}. */ @Override - public void init(String user) { + public void init(String userName) { + this.user = RouterServerUtil.setupUser(userName); if (this.nextInterceptor != null) { - this.nextInterceptor.init(user); + this.nextInterceptor.init(userName); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java new file mode 100644 index 0000000000..4564f8d8b8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java @@ -0,0 +1,283 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.router.rmadmin; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.NotImplementedException; +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.ipc.StandbyException; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.NodesToAttributesMappingRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.NodesToAttributesMappingResponse; +import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; +import org.apache.hadoop.yarn.server.router.RouterMetrics; +import org.apache.hadoop.yarn.server.router.RouterServerUtil; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.MonotonicClock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Map; +import java.util.Collection; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.ConcurrentHashMap; + +public class FederationRMAdminInterceptor extends AbstractRMAdminRequestInterceptor { + + private static final Logger LOG = + LoggerFactory.getLogger(FederationRMAdminInterceptor.class); + + private Map adminRMProxies; + private FederationStateStoreFacade federationFacade; + private final Clock clock = new MonotonicClock(); + private RouterMetrics routerMetrics; + private ThreadPoolExecutor executorService; + private Configuration conf; + + @Override + public void init(String userName) { + super.init(userName); + + int numThreads = getConf().getInt( + YarnConfiguration.ROUTER_USER_CLIENT_THREADS_SIZE, + YarnConfiguration.DEFAULT_ROUTER_USER_CLIENT_THREADS_SIZE); + ThreadFactory threadFactory = new ThreadFactoryBuilder() + .setNameFormat("RPC Router RMAdminClient-" + userName + "-%d ").build(); + + BlockingQueue workQueue = new LinkedBlockingQueue(); + this.executorService = new ThreadPoolExecutor(numThreads, numThreads, + 0L, TimeUnit.MILLISECONDS, workQueue, threadFactory); + + federationFacade = FederationStateStoreFacade.getInstance(); + this.conf = this.getConf(); + this.adminRMProxies = new ConcurrentHashMap<>(); + routerMetrics = RouterMetrics.getMetrics(); + } + + @VisibleForTesting + protected ResourceManagerAdministrationProtocol getAdminRMProxyForSubCluster( + SubClusterId subClusterId) throws YarnException { + + if (adminRMProxies.containsKey(subClusterId)) { + return adminRMProxies.get(subClusterId); + } + + ResourceManagerAdministrationProtocol adminRMProxy = null; + try { + boolean serviceAuthEnabled = this.conf.getBoolean( + CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false); + UserGroupInformation realUser = user; + if (serviceAuthEnabled) { + realUser = UserGroupInformation.createProxyUser( + user.getShortUserName(), UserGroupInformation.getLoginUser()); + } + adminRMProxy = FederationProxyProviderUtil.createRMProxy(getConf(), + ResourceManagerAdministrationProtocol.class, subClusterId, realUser); + } catch (Exception e) { + RouterServerUtil.logAndThrowException(e, + "Unable to create the interface to reach the SubCluster %s", subClusterId); + } + adminRMProxies.put(subClusterId, adminRMProxy); + return adminRMProxy; + } + + @Override + public void setNextInterceptor(RMAdminRequestInterceptor next) { + throw new YarnRuntimeException("setNextInterceptor is being called on " + + "FederationRMAdminRequestInterceptor, which should be the last one " + + "in the chain. Check if the interceptor pipeline configuration " + + "is correct"); + } + + @Override + public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request) + throws StandbyException, YarnException, IOException { + + // parameter verification. + if (request == null) { + routerMetrics.incrRefreshQueuesFailedRetrieved(); + RouterServerUtil.logAndThrowException("Missing RefreshQueues request.", null); + } + + // call refreshQueues of activeSubClusters. + try { + long startTime = clock.getTime(); + RMAdminProtocolMethod remoteMethod = new RMAdminProtocolMethod( + new Class[] {RefreshQueuesRequest.class}, new Object[] {request}); + + Collection refreshQueueResps = + remoteMethod.invokeConcurrent(this, RefreshQueuesResponse.class); + + // If we get the return result from refreshQueueResps, + // it means that the call has been successful, + // and the RefreshQueuesResponse method can be reconstructed and returned. + if (CollectionUtils.isNotEmpty(refreshQueueResps)) { + long stopTime = clock.getTime(); + routerMetrics.succeededRefreshQueuesRetrieved(stopTime - startTime); + return RefreshQueuesResponse.newInstance(); + } + } catch (Exception e) { + routerMetrics.incrRefreshQueuesFailedRetrieved(); + RouterServerUtil.logAndThrowException("Unable to refreshQueue to exception.", e); + } + + routerMetrics.incrRefreshQueuesFailedRetrieved(); + throw new YarnException("Unable to refreshQueue."); + } + + @Override + public RefreshNodesResponse refreshNodes(RefreshNodesRequest request) + throws StandbyException, YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public RefreshSuperUserGroupsConfigurationResponse refreshSuperUserGroupsConfiguration( + RefreshSuperUserGroupsConfigurationRequest request) + throws StandbyException, YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public RefreshUserToGroupsMappingsResponse refreshUserToGroupsMappings( + RefreshUserToGroupsMappingsRequest request) + throws StandbyException, YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public RefreshAdminAclsResponse refreshAdminAcls(RefreshAdminAclsRequest request) + throws YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public RefreshServiceAclsResponse refreshServiceAcls(RefreshServiceAclsRequest request) + throws YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public UpdateNodeResourceResponse updateNodeResource(UpdateNodeResourceRequest request) + throws YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public RefreshNodesResourcesResponse refreshNodesResources(RefreshNodesResourcesRequest request) + throws YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public AddToClusterNodeLabelsResponse addToClusterNodeLabels( + AddToClusterNodeLabelsRequest request) + throws YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public RemoveFromClusterNodeLabelsResponse removeFromClusterNodeLabels( + RemoveFromClusterNodeLabelsRequest request) + throws YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public ReplaceLabelsOnNodeResponse replaceLabelsOnNode(ReplaceLabelsOnNodeRequest request) + throws YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public CheckForDecommissioningNodesResponse checkForDecommissioningNodes( + CheckForDecommissioningNodesRequest checkForDecommissioningNodesRequest) + throws YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public RefreshClusterMaxPriorityResponse refreshClusterMaxPriority( + RefreshClusterMaxPriorityRequest request) + throws YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public NodesToAttributesMappingResponse mapAttributesToNodes( + NodesToAttributesMappingRequest request) + throws YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public String[] getGroupsForUser(String user) throws IOException { + return new String[0]; + } + + @VisibleForTesting + public FederationStateStoreFacade getFederationFacade() { + return federationFacade; + } + + @VisibleForTesting + public ThreadPoolExecutor getExecutorService() { + return executorService; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RMAdminProtocolMethod.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RMAdminProtocolMethod.java new file mode 100644 index 0000000000..e1aa806ff8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RMAdminProtocolMethod.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.router.rmadmin; + +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; +import org.apache.hadoop.yarn.server.federation.utils.FederationMethodWrapper; +import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.lang.reflect.Method; +import java.util.Collection; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.List; +import java.util.ArrayList; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadPoolExecutor; + + +/** + * Class to define admin method, params and arguments. + */ +public class RMAdminProtocolMethod extends FederationMethodWrapper { + + private static final Logger LOG = + LoggerFactory.getLogger(RMAdminProtocolMethod.class); + + private FederationStateStoreFacade federationFacade; + private FederationRMAdminInterceptor rmAdminInterceptor; + private Configuration configuration; + + public RMAdminProtocolMethod(Class[] pTypes, Object... pParams) + throws IOException { + super(pTypes, pParams); + } + + public Collection invokeConcurrent(FederationRMAdminInterceptor interceptor, + Class clazz) throws YarnException { + this.rmAdminInterceptor = interceptor; + this.federationFacade = FederationStateStoreFacade.getInstance(); + this.configuration = interceptor.getConf(); + return invokeConcurrent(clazz); + } + + @Override + protected Collection invokeConcurrent(Class clazz) throws YarnException { + String methodName = Thread.currentThread().getStackTrace()[3].getMethodName(); + this.setMethodName(methodName); + + ThreadPoolExecutor executorService = rmAdminInterceptor.getExecutorService(); + + // Get Active SubClusters + Map subClusterInfo = + federationFacade.getSubClusters(true); + Collection subClusterIds = subClusterInfo.keySet(); + + List>> callables = new ArrayList<>(); + List>> futures = new ArrayList<>(); + Map exceptions = new TreeMap<>(); + + // Generate parallel Callable tasks + for (SubClusterId subClusterId : subClusterIds) { + callables.add(() -> { + ResourceManagerAdministrationProtocol protocol = + rmAdminInterceptor.getAdminRMProxyForSubCluster(subClusterId); + Class[] types = this.getTypes(); + Object[] params = this.getParams(); + Method method = ResourceManagerAdministrationProtocol.class.getMethod(methodName, types); + Object result = method.invoke(protocol, params); + return Pair.of(subClusterId, result); + }); + } + + // Get results from multiple threads + Map results = new TreeMap<>(); + try { + futures.addAll(executorService.invokeAll(callables)); + futures.stream().forEach(future -> { + SubClusterId subClusterId = null; + try { + Pair pair = future.get(); + subClusterId = pair.getKey(); + Object result = pair.getValue(); + results.put(subClusterId, clazz.cast(result)); + } catch (InterruptedException | ExecutionException e) { + Throwable cause = e.getCause(); + LOG.error("Cannot execute {} on {}: {}", methodName, subClusterId, cause.getMessage()); + exceptions.put(subClusterId, e); + } + }); + } catch (InterruptedException e) { + throw new YarnException("invokeConcurrent Failed.", e); + } + + // All sub-clusters return results to be considered successful, + // otherwise an exception will be thrown. + if (exceptions != null && !exceptions.isEmpty()) { + Set subClusterIdSets = exceptions.keySet(); + throw new YarnException("invokeConcurrent Failed, An exception occurred in subClusterIds = " + + StringUtils.join(subClusterIdSets, ",")); + } + + // return result + return results.values(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/BaseRouterRMAdminTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/BaseRouterRMAdminTest.java index a8d730fbe8..33cda8751d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/BaseRouterRMAdminTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/BaseRouterRMAdminTest.java @@ -88,27 +88,36 @@ protected MockRouterRMAdminService getRouterRMAdminService() { @Before public void setUp() { - this.conf = new YarnConfiguration(); + this.conf = createConfiguration(); + this.dispatcher = new AsyncDispatcher(); + this.dispatcher.init(conf); + this.dispatcher.start(); + this.rmAdminService = createAndStartRouterRMAdminService(); + DefaultMetricsSystem.setMiniClusterMode(true); + } + + protected Configuration getConf() { + return this.conf; + } + + public void setUpConfig() { + this.conf = createConfiguration(); + } + + protected Configuration createConfiguration() { + YarnConfiguration config = new YarnConfiguration(); String mockPassThroughInterceptorClass = PassThroughRMAdminRequestInterceptor.class.getName(); // Create a request interceptor pipeline for testing. The last one in the // chain will call the mock resource manager. The others in the chain will // simply forward it to the next one in the chain - this.conf.set(YarnConfiguration.ROUTER_RMADMIN_INTERCEPTOR_CLASS_PIPELINE, - mockPassThroughInterceptorClass + "," + mockPassThroughInterceptorClass - + "," + mockPassThroughInterceptorClass + "," - + MockRMAdminRequestInterceptor.class.getName()); + config.set(YarnConfiguration.ROUTER_RMADMIN_INTERCEPTOR_CLASS_PIPELINE, + mockPassThroughInterceptorClass + "," + mockPassThroughInterceptorClass + "," + + mockPassThroughInterceptorClass + "," + MockRMAdminRequestInterceptor.class.getName()); - this.conf.setInt(YarnConfiguration.ROUTER_PIPELINE_CACHE_MAX_SIZE, - TEST_MAX_CACHE_SIZE); - - this.dispatcher = new AsyncDispatcher(); - this.dispatcher.init(conf); - this.dispatcher.start(); - this.rmAdminService = createAndStartRouterRMAdminService(); - - DefaultMetricsSystem.setMiniClusterMode(true); + config.setInt(YarnConfiguration.ROUTER_PIPELINE_CACHE_MAX_SIZE, TEST_MAX_CACHE_SIZE); + return config; } @After @@ -142,194 +151,154 @@ public MockRouterRMAdminService() { protected RefreshQueuesResponse refreshQueues(String user) throws IOException, InterruptedException { return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public RefreshQueuesResponse run() throws Exception { - RefreshQueuesRequest req = RefreshQueuesRequest.newInstance(); - RefreshQueuesResponse response = - getRouterRMAdminService().refreshQueues(req); - return response; - } + .doAs((PrivilegedExceptionAction) () -> { + RefreshQueuesRequest req = RefreshQueuesRequest.newInstance(); + RefreshQueuesResponse response = + getRouterRMAdminService().refreshQueues(req); + return response; }); } protected RefreshNodesResponse refreshNodes(String user) throws IOException, InterruptedException { return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public RefreshNodesResponse run() throws Exception { - RefreshNodesRequest req = RefreshNodesRequest.newInstance(); - RefreshNodesResponse response = - getRouterRMAdminService().refreshNodes(req); - return response; - } + .doAs((PrivilegedExceptionAction) () -> { + RefreshNodesRequest req = RefreshNodesRequest.newInstance(); + RefreshNodesResponse response = + getRouterRMAdminService().refreshNodes(req); + return response; }); } protected RefreshSuperUserGroupsConfigurationResponse refreshSuperUserGroupsConfiguration( String user) throws IOException, InterruptedException { return UserGroupInformation.createRemoteUser(user).doAs( - new PrivilegedExceptionAction() { - @Override - public RefreshSuperUserGroupsConfigurationResponse run() - throws Exception { - RefreshSuperUserGroupsConfigurationRequest req = - RefreshSuperUserGroupsConfigurationRequest.newInstance(); - RefreshSuperUserGroupsConfigurationResponse response = - getRouterRMAdminService() - .refreshSuperUserGroupsConfiguration(req); - return response; - } + (PrivilegedExceptionAction) () -> { + RefreshSuperUserGroupsConfigurationRequest req = + RefreshSuperUserGroupsConfigurationRequest.newInstance(); + RefreshSuperUserGroupsConfigurationResponse response = + getRouterRMAdminService() + .refreshSuperUserGroupsConfiguration(req); + return response; }); } protected RefreshUserToGroupsMappingsResponse refreshUserToGroupsMappings( String user) throws IOException, InterruptedException { return UserGroupInformation.createRemoteUser(user).doAs( - new PrivilegedExceptionAction() { - @Override - public RefreshUserToGroupsMappingsResponse run() throws Exception { - RefreshUserToGroupsMappingsRequest req = - RefreshUserToGroupsMappingsRequest.newInstance(); - RefreshUserToGroupsMappingsResponse response = - getRouterRMAdminService().refreshUserToGroupsMappings(req); - return response; - } + (PrivilegedExceptionAction) () -> { + RefreshUserToGroupsMappingsRequest req = + RefreshUserToGroupsMappingsRequest.newInstance(); + RefreshUserToGroupsMappingsResponse response = + getRouterRMAdminService().refreshUserToGroupsMappings(req); + return response; }); } protected RefreshAdminAclsResponse refreshAdminAcls(String user) throws IOException, InterruptedException { return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public RefreshAdminAclsResponse run() throws Exception { - RefreshAdminAclsRequest req = RefreshAdminAclsRequest.newInstance(); - RefreshAdminAclsResponse response = - getRouterRMAdminService().refreshAdminAcls(req); - return response; - } + .doAs((PrivilegedExceptionAction) () -> { + RefreshAdminAclsRequest req = RefreshAdminAclsRequest.newInstance(); + RefreshAdminAclsResponse response = + getRouterRMAdminService().refreshAdminAcls(req); + return response; }); } protected RefreshServiceAclsResponse refreshServiceAcls(String user) throws IOException, InterruptedException { return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public RefreshServiceAclsResponse run() throws Exception { - RefreshServiceAclsRequest req = - RefreshServiceAclsRequest.newInstance(); - RefreshServiceAclsResponse response = - getRouterRMAdminService().refreshServiceAcls(req); - return response; - } + .doAs((PrivilegedExceptionAction) () -> { + RefreshServiceAclsRequest req = + RefreshServiceAclsRequest.newInstance(); + RefreshServiceAclsResponse response = + getRouterRMAdminService().refreshServiceAcls(req); + return response; }); } protected UpdateNodeResourceResponse updateNodeResource(String user) throws IOException, InterruptedException { return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public UpdateNodeResourceResponse run() throws Exception { - UpdateNodeResourceRequest req = - UpdateNodeResourceRequest.newInstance(null); - UpdateNodeResourceResponse response = - getRouterRMAdminService().updateNodeResource(req); - return response; - } + .doAs((PrivilegedExceptionAction) () -> { + UpdateNodeResourceRequest req = + UpdateNodeResourceRequest.newInstance(null); + UpdateNodeResourceResponse response = + getRouterRMAdminService().updateNodeResource(req); + return response; }); } protected RefreshNodesResourcesResponse refreshNodesResources(String user) throws IOException, InterruptedException { return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public RefreshNodesResourcesResponse run() throws Exception { - RefreshNodesResourcesRequest req = - RefreshNodesResourcesRequest.newInstance(); - RefreshNodesResourcesResponse response = - getRouterRMAdminService().refreshNodesResources(req); - return response; - } + .doAs((PrivilegedExceptionAction) () -> { + RefreshNodesResourcesRequest req = + RefreshNodesResourcesRequest.newInstance(); + RefreshNodesResourcesResponse response = + getRouterRMAdminService().refreshNodesResources(req); + return response; }); } protected AddToClusterNodeLabelsResponse addToClusterNodeLabels(String user) throws IOException, InterruptedException { return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public AddToClusterNodeLabelsResponse run() throws Exception { - AddToClusterNodeLabelsRequest req = - AddToClusterNodeLabelsRequest.newInstance(null); - AddToClusterNodeLabelsResponse response = - getRouterRMAdminService().addToClusterNodeLabels(req); - return response; - } + .doAs((PrivilegedExceptionAction) () -> { + AddToClusterNodeLabelsRequest req = + AddToClusterNodeLabelsRequest.newInstance(null); + AddToClusterNodeLabelsResponse response = + getRouterRMAdminService().addToClusterNodeLabels(req); + return response; }); } protected RemoveFromClusterNodeLabelsResponse removeFromClusterNodeLabels( String user) throws IOException, InterruptedException { return UserGroupInformation.createRemoteUser(user).doAs( - new PrivilegedExceptionAction() { - @Override - public RemoveFromClusterNodeLabelsResponse run() throws Exception { - RemoveFromClusterNodeLabelsRequest req = - RemoveFromClusterNodeLabelsRequest.newInstance(null); - RemoveFromClusterNodeLabelsResponse response = - getRouterRMAdminService().removeFromClusterNodeLabels(req); - return response; - } + (PrivilegedExceptionAction) () -> { + RemoveFromClusterNodeLabelsRequest req = + RemoveFromClusterNodeLabelsRequest.newInstance(null); + RemoveFromClusterNodeLabelsResponse response = + getRouterRMAdminService().removeFromClusterNodeLabels(req); + return response; }); } protected ReplaceLabelsOnNodeResponse replaceLabelsOnNode(String user) throws IOException, InterruptedException { return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public ReplaceLabelsOnNodeResponse run() throws Exception { - ReplaceLabelsOnNodeRequest req = ReplaceLabelsOnNodeRequest - .newInstance(new HashMap>()); - ReplaceLabelsOnNodeResponse response = - getRouterRMAdminService().replaceLabelsOnNode(req); - return response; - } + .doAs((PrivilegedExceptionAction) () -> { + ReplaceLabelsOnNodeRequest req = ReplaceLabelsOnNodeRequest + .newInstance(new HashMap>()); + ReplaceLabelsOnNodeResponse response = + getRouterRMAdminService().replaceLabelsOnNode(req); + return response; }); } protected CheckForDecommissioningNodesResponse checkForDecommissioningNodes( String user) throws IOException, InterruptedException { return UserGroupInformation.createRemoteUser(user).doAs( - new PrivilegedExceptionAction() { - @Override - public CheckForDecommissioningNodesResponse run() throws Exception { - CheckForDecommissioningNodesRequest req = - CheckForDecommissioningNodesRequest.newInstance(); - CheckForDecommissioningNodesResponse response = - getRouterRMAdminService().checkForDecommissioningNodes(req); - return response; - } + (PrivilegedExceptionAction) () -> { + CheckForDecommissioningNodesRequest req = + CheckForDecommissioningNodesRequest.newInstance(); + CheckForDecommissioningNodesResponse response = + getRouterRMAdminService().checkForDecommissioningNodes(req); + return response; }); } protected RefreshClusterMaxPriorityResponse refreshClusterMaxPriority( String user) throws IOException, InterruptedException { return UserGroupInformation.createRemoteUser(user).doAs( - new PrivilegedExceptionAction() { - @Override - public RefreshClusterMaxPriorityResponse run() throws Exception { - RefreshClusterMaxPriorityRequest req = - RefreshClusterMaxPriorityRequest.newInstance(); - RefreshClusterMaxPriorityResponse response = - getRouterRMAdminService().refreshClusterMaxPriority(req); - return response; - } + (PrivilegedExceptionAction) () -> { + RefreshClusterMaxPriorityRequest req = + RefreshClusterMaxPriorityRequest.newInstance(); + RefreshClusterMaxPriorityResponse response = + getRouterRMAdminService().refreshClusterMaxPriority(req); + return response; }); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestFederationRMAdminInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestFederationRMAdminInterceptor.java new file mode 100644 index 0000000000..3aa61a68a3 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestFederationRMAdminInterceptor.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.router.rmadmin; + +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest; +import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; +import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreTestUtil; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * Extends the FederationRMAdminInterceptor and overrides methods to provide a + * testable implementation of FederationRMAdminInterceptor. + */ +public class TestFederationRMAdminInterceptor extends BaseRouterRMAdminTest { + + private static final Logger LOG = + LoggerFactory.getLogger(TestFederationRMAdminInterceptor.class); + + //////////////////////////////// + // constant information + //////////////////////////////// + private final static String USER_NAME = "test-user"; + private final static int NUM_SUBCLUSTER = 4; + + private TestableFederationRMAdminInterceptor interceptor; + private FederationStateStoreFacade facade; + private MemoryFederationStateStore stateStore; + private FederationStateStoreTestUtil stateStoreUtil; + private List subClusters; + + @Override + public void setUp() { + + super.setUpConfig(); + + // Initialize facade & stateSore + stateStore = new MemoryFederationStateStore(); + stateStore.init(this.getConf()); + facade = FederationStateStoreFacade.getInstance(); + facade.reinitialize(stateStore, getConf()); + stateStoreUtil = new FederationStateStoreTestUtil(stateStore); + + // Initialize interceptor + interceptor = new TestableFederationRMAdminInterceptor(); + interceptor.setConf(this.getConf()); + interceptor.init(USER_NAME); + + // Storage SubClusters + subClusters = new ArrayList<>(); + try { + for (int i = 0; i < NUM_SUBCLUSTER; i++) { + SubClusterId sc = SubClusterId.newInstance(Integer.toString(i)); + stateStoreUtil.registerSubCluster(sc); + subClusters.add(sc); + } + } catch (YarnException e) { + LOG.error(e.getMessage()); + Assert.fail(); + } + + DefaultMetricsSystem.setMiniClusterMode(true); + } + + @Override + protected YarnConfiguration createConfiguration() { + // Set Enable YarnFederation + YarnConfiguration config = new YarnConfiguration(); + config.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true); + + String mockPassThroughInterceptorClass = + PassThroughRMAdminRequestInterceptor.class.getName(); + + // Create a request interceptor pipeline for testing. The last one in the + // chain will call the mock resource manager. The others in the chain will + // simply forward it to the next one in the chain + config.set(YarnConfiguration.ROUTER_RMADMIN_INTERCEPTOR_CLASS_PIPELINE, + mockPassThroughInterceptorClass + "," + mockPassThroughInterceptorClass + "," + + TestFederationRMAdminInterceptor.class.getName()); + return config; + } + + @Override + public void tearDown() { + interceptor.shutdown(); + super.tearDown(); + } + + @Test + public void testRefreshQueues() throws IOException, YarnException { + RefreshQueuesRequest request = RefreshQueuesRequest.newInstance(); + interceptor.refreshQueues(request); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestableFederationRMAdminInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestableFederationRMAdminInterceptor.java new file mode 100644 index 0000000000..26f50f88b8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestableFederationRMAdminInterceptor.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.router.rmadmin; + +import org.apache.commons.collections.MapUtils; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.resourcemanager.AdminService; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.ConnectException; +import java.util.Set; +import java.util.HashSet; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class TestableFederationRMAdminInterceptor extends FederationRMAdminInterceptor { + + // Record log information + private static final Logger LOG = + LoggerFactory.getLogger(TestableFederationRMAdminInterceptor.class); + + // Used to Store the relationship between SubClusterId and RM + private ConcurrentHashMap mockRMs = new ConcurrentHashMap<>(); + + // Store Bad subCluster + private Set badSubCluster = new HashSet<>(); + + @Override + protected ResourceManagerAdministrationProtocol getAdminRMProxyForSubCluster( + SubClusterId subClusterId) throws YarnException { + MockRM mockRM; + synchronized (this) { + if (mockRMs.containsKey(subClusterId)) { + mockRM = mockRMs.get(subClusterId); + } else { + mockRM = new MockRM(); + if (badSubCluster.contains(subClusterId)) { + return new MockRMAdminBadService(mockRM); + } + mockRM.init(super.getConf()); + mockRM.start(); + mockRMs.put(subClusterId, mockRM); + } + return mockRM.getAdminService(); + } + } + + // This represents an unserviceable SubCluster + private class MockRMAdminBadService extends AdminService { + MockRMAdminBadService(ResourceManager rm) { + super(rm); + } + + @Override + public void refreshQueues() throws IOException, YarnException { + throw new ConnectException("RM is stopped"); + } + } + + @Override + public void shutdown() { + // if mockRMs is not null + if (MapUtils.isNotEmpty(mockRMs)) { + for (Map.Entry item : mockRMs.entrySet()) { + SubClusterId subClusterId = item.getKey(); + // close mockRM. + MockRM mockRM = item.getValue(); + if (mockRM != null) { + LOG.info("subClusterId = {} mockRM shutdown.", subClusterId); + mockRM.stop(); + } + } + } + mockRMs.clear(); + super.shutdown(); + } +}