YARN-11504. [Federation] YARN Federation Supports Non-HA mode. (#5722)

This commit is contained in:
slfan1989 2023-06-13 06:28:13 +08:00 committed by GitHub
parent 2794fe264b
commit a4c3d48c31
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 145 additions and 6 deletions

View File

@ -3989,6 +3989,10 @@ public static boolean isAclEnabled(Configuration conf) {
FEDERATION_PREFIX + "failover.enabled";
public static final boolean DEFAULT_FEDERATION_FAILOVER_ENABLED = true;
public static final String FEDERATION_NON_HA_ENABLED =
FEDERATION_PREFIX + "non-ha.enabled";
public static final boolean DEFAULT_FEDERATION_NON_HA_ENABLED = false;
public static final String FEDERATION_STATESTORE_CLIENT_CLASS =
FEDERATION_PREFIX + "state-store.class";

View File

@ -94,12 +94,8 @@ public static <T> T createRMProxy(final Configuration configuration,
token.setService(ClientRMProxy.getAMRMTokenService(configuration));
setAuthModeInConf(configuration);
}
final T proxyConnection = user.doAs(new PrivilegedExceptionAction<T>() {
@Override
public T run() throws Exception {
return ClientRMProxy.createRMProxy(configuration, protocol);
}
});
final T proxyConnection = user.doAs((PrivilegedExceptionAction<T>) () ->
ClientRMProxy.createRMProxyFederation(configuration, protocol));
return proxyConnection;
} catch (InterruptedException e) {

View File

@ -22,6 +22,7 @@
import java.net.InetSocketAddress;
import java.util.ArrayList;
import org.apache.hadoop.classification.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@ -73,6 +74,29 @@ public static <T> T createRMProxy(final Configuration configuration,
return createRMProxy(configuration, protocol, clientRMProxy);
}
/**
* Create a proxy to the ResourceManager for the specified protocol.
* This method is only used for NodeManager#AMRMClientUtils.
*
* @param configuration Configuration with all the required information.
* @param protocol Client protocol for which proxy is being requested.
* @param <T> Type of proxy.
* @return Proxy to the ResourceManager for the specified client protocol.
* @throws IOException io error occur.
*/
public static <T> T createRMProxyFederation(final Configuration configuration,
final Class<T> protocol) throws IOException {
ClientRMProxy<T> clientRMProxy = new ClientRMProxy<>();
return createRMProxyFederation(configuration, protocol, clientRMProxy);
}
@VisibleForTesting
public static <T> RMFailoverProxyProvider<T> getClientRMFailoverProxyProvider(
final YarnConfiguration configuration, final Class<T> protocol) {
ClientRMProxy<T> clientRMProxy = new ClientRMProxy<>();
return getRMFailoverProxyProvider(configuration, protocol, clientRMProxy);
}
private static void setAMRMTokenService(final Configuration conf)
throws IOException {
for (Token<? extends TokenIdentifier> token : UserGroupInformation

View File

@ -116,6 +116,41 @@ protected static <T> T createRMProxy(final Configuration configuration,
return newProxyInstance(conf, protocol, instance, retryPolicy);
}
/**
* This functionality is only used for NodeManager and only in non-HA mode.
* Its purpose is to ensure that when initializes UAM, it can find the correct cluster.
*
* @param configuration configuration.
* @param protocol protocol.
* @param instance RMProxy instance.
* @return RMProxy.
* @param <T> Generic T.
* @throws IOException io error occur.
*/
protected static <T> T createRMProxyFederation(final Configuration configuration,
final Class<T> protocol, RMProxy<T> instance) throws IOException {
YarnConfiguration yarnConf = new YarnConfiguration(configuration);
RetryPolicy retryPolicy = createRetryPolicy(yarnConf, isFailoverEnabled(yarnConf));
return newProxyInstanceFederation(yarnConf, protocol, instance, retryPolicy);
}
protected static <T> T newProxyInstanceFederation(final YarnConfiguration conf,
final Class<T> protocol, RMProxy<T> instance, RetryPolicy retryPolicy) {
RMFailoverProxyProvider<T> provider = getRMFailoverProxyProvider(conf, protocol, instance);
return (T) RetryProxy.create(protocol, provider, retryPolicy);
}
protected static <T> RMFailoverProxyProvider<T> getRMFailoverProxyProvider(
final YarnConfiguration conf, final Class<T> protocol, RMProxy<T> instance) {
RMFailoverProxyProvider<T> provider;
if (isFederationNonHAEnabled(conf)) {
provider = instance.createRMFailoverProxyProvider(conf, protocol);
} else {
provider = instance.createNonHaRMFailoverProxyProvider(conf, protocol);
}
return provider;
}
/**
* Currently, used by NodeManagers only.
* Create a proxy for the specified protocol. For non-HA,
@ -355,4 +390,24 @@ private static boolean isFailoverEnabled(YarnConfiguration conf) {
return false;
}
/**
* If RM is not configured with HA, NM will not configure yarn.resourcemanager.ha.rmIds locally.
*
* If federation mode is enabled and RMProxy#isFailoverEnabled returns true,
* when NM starts Container, it will try to find the yarn.resourcemanager.ha.rmIds property.
*
* However, an error will occur because this property is not configured
* if the user has not configured HA.
*
* To solve this issue, we can configure the yarn.federation.no-ha.enabled property in NM,
* which tells NM to run in a non-HA environment.
*
* @param conf YarnConfiguration
* @return true, federation support non-HA, false, federation not support non-HA.
*/
private static boolean isFederationNonHAEnabled(YarnConfiguration conf) {
boolean isNonHAEnabled = conf.getBoolean(YarnConfiguration.FEDERATION_NON_HA_ENABLED,
YarnConfiguration.DEFAULT_FEDERATION_NON_HA_ENABLED);
return isNonHAEnabled;
}
}

View File

@ -5354,4 +5354,16 @@
<value></value>
</property>
<property>
<description>
YARN Federation supports Non-HA mode.
If the cluster is not configured with HA but wants to use YARN Federation,
this option can be used.
Setting it to true enables Non-HA mode, while false disables Non-HA mode.
The default value is false.
</description>
<name>yarn.federation.non-ha.enabled</name>
<value>false</value>
</property>
</configuration>

View File

@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.federation.failover;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.client.DefaultNoHARMFailoverProxyProvider;
import org.apache.hadoop.yarn.client.RMFailoverProxyProvider;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.junit.Test;
import static org.junit.Assert.assertTrue;
/**
* We will test the failover of Federation.
*/
public class TestFederationRMFailoverProxyProvider {
@Test
public void testRMFailoverProxyProvider() throws YarnException {
YarnConfiguration configuration = new YarnConfiguration();
RMFailoverProxyProvider<ApplicationClientProtocol> clientRMFailoverProxyProvider =
ClientRMProxy.getClientRMFailoverProxyProvider(configuration, ApplicationClientProtocol.class);
assertTrue(clientRMFailoverProxyProvider instanceof DefaultNoHARMFailoverProxyProvider);
FederationProxyProviderUtil.updateConfForFederation(configuration, "SC-1");
configuration.setBoolean(YarnConfiguration.FEDERATION_NON_HA_ENABLED,true);
RMFailoverProxyProvider<ApplicationClientProtocol> clientRMFailoverProxyProvider2 =
ClientRMProxy.getClientRMFailoverProxyProvider(configuration, ApplicationClientProtocol.class);
assertTrue(clientRMFailoverProxyProvider2 instanceof FederationRMFailoverProxyProvider);
}
}