YARN-11090. [GPG] Support Secure Mode. (#5782)

This commit is contained in:
slfan1989 2023-07-03 23:56:24 +08:00 committed by GitHub
parent e8590adb7b
commit 6042d59904
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 316 additions and 0 deletions

View File

@ -4366,6 +4366,16 @@ public class YarnConfiguration extends Configuration {
FEDERATION_GPG_PREFIX + "subcluster.heartbeat.expiration-ms";
public static final long DEFAULT_GPG_SUBCLUSTER_EXPIRATION_MS = TimeUnit.MINUTES.toMillis(30);
/** Keytab for GPG. **/
public static final String GPG_KEYTAB = FEDERATION_GPG_PREFIX + "keytab.file";
/** The Kerberos principal for the globalpolicygenerator.*/
public static final String GPG_PRINCIPAL = FEDERATION_GPG_PREFIX + "kerberos.principal";
/** The Kerberos principal hostname for the yarn gpg.*/
public static final String GPG_KERBEROS_PRINCIPAL_HOSTNAME_KEY = FEDERATION_GPG_PREFIX +
"kerberos.principal.hostname";
/**
* Connection and Read timeout from the Router to RM.
*/

View File

@ -5376,4 +5376,36 @@
<value>false</value>
</property>
<property>
<name>yarn.federation.gpg.keytab.file</name>
<value></value>
<description>
The keytab file used by gpg to login as its
service principal. The principal name is configured with
dfs.federation.router.kerberos.principal.
</description>
</property>
<property>
<name>yarn.federation.gpg.kerberos.principal</name>
<value></value>
<description>
The GPG service principal. This is typically set to
gpg/_HOST@REALM.TLD. Each GPG will substitute _HOST with its
own fully qualified hostname at startup. The _HOST placeholder
allows using the same configuration setting on both GPG setup.
</description>
</property>
<property>
<name>yarn.federation.gpg.kerberos.principal.hostname</name>
<value></value>
<description>
Optional.
The hostname for the Router containing this
configuration file. Will be different for each machine.
Defaults to current hostname.
</description>
</property>
</configuration>

View File

@ -86,6 +86,19 @@
<classifier>jdk8</classifier>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minikdc</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
<scope>test</scope>
<type>test-jar</type>
</dependency>
</dependencies>
<build>

View File

@ -18,6 +18,9 @@
package org.apache.hadoop.yarn.server.globalpolicygenerator;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -25,11 +28,13 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang.time.DurationFormatUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.apache.hadoop.yarn.server.globalpolicygenerator.subclustercleaner.SubClusterCleaner;
import org.slf4j.Logger;
@ -68,6 +73,12 @@ public class GlobalPolicyGenerator extends CompositeService {
this.gpgContext = new GPGContextImpl();
}
protected void doSecureLogin() throws IOException {
Configuration config = getConfig();
SecurityUtil.login(config, YarnConfiguration.GPG_KEYTAB,
YarnConfiguration.GPG_PRINCIPAL, getHostName(config));
}
protected void initAndStart(Configuration conf, boolean hasToReboot) {
// Remove the old hook if we are rebooting.
if (hasToReboot && null != gpgShutdownHook) {
@ -99,6 +110,12 @@ public class GlobalPolicyGenerator extends CompositeService {
@Override
protected void serviceStart() throws Exception {
try {
doSecureLogin();
} catch (IOException e) {
throw new YarnRuntimeException("Failed GPG login", e);
}
super.serviceStart();
// Scheduler SubClusterCleaner service
@ -156,6 +173,23 @@ public class GlobalPolicyGenerator extends CompositeService {
}
}
/**
* Returns the hostname for this Router. If the hostname is not
* explicitly configured in the given config, then it is determined.
*
* @param config configuration
* @return the hostname (NB: may not be a FQDN)
* @throws UnknownHostException if the hostname cannot be determined
*/
private String getHostName(Configuration config)
throws UnknownHostException {
String name = config.get(YarnConfiguration.GPG_KERBEROS_PRINCIPAL_HOSTNAME_KEY);
if (name == null) {
name = InetAddress.getLocalHost().getHostName();
}
return name;
}
public static void main(String[] argv) {
try {
startGPG(argv, new YarnConfiguration());

View File

@ -0,0 +1,177 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.globalpolicygenerator.secure;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.minikdc.MiniKdc;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.apache.hadoop.yarn.server.globalpolicygenerator.GlobalPolicyGenerator;
import org.junit.BeforeClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.Properties;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
public abstract class AbstractGlobalPolicyGeneratorTest {
private static final Logger LOG =
LoggerFactory.getLogger(AbstractGlobalPolicyGeneratorTest.class);
////////////////////////////////
// Kerberos Constants
////////////////////////////////
public static final String REALM = "EXAMPLE.COM";
public static final String GPG = "gpg";
public static final String LOCALHOST = "localhost";
public static final String IP127001 = "127.0.0.1";
public static final String GPG_LOCALHOST = "gpg/" + LOCALHOST;
public static final String GPG_LOCALHOST_REALM = GPG_LOCALHOST + "@" + REALM;
public static final String SUN_SECURITY_KRB5_DEBUG = "sun.security.krb5.debug";
public static final String KERBEROS = "kerberos";
////////////////////////////////
// BeforeSecureRouterTestClass Init
////////////////////////////////
private static MiniKdc kdc;
private static File routerKeytab;
private static File kdcWorkDir;
private static Configuration conf;
private GlobalPolicyGenerator gpg;
@BeforeClass
public static void beforeSecureRouterTestClass() throws Exception {
// Sets up the KDC and Principals.
setupKDCAndPrincipals();
// Init YarnConfiguration
conf = new YarnConfiguration();
// Enable Kerberos authentication configuration
conf.setBoolean(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, true);
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, KERBEROS);
// Router Kerberos KeyTab configuration
conf.set(YarnConfiguration.GPG_PRINCIPAL, GPG_LOCALHOST_REALM);
conf.set(YarnConfiguration.GPG_KEYTAB, routerKeytab.getAbsolutePath());
DefaultMetricsSystem.setMiniClusterMode(true);
}
/**
* Sets up the KDC and Principals.
*
* @throws Exception an error occurred.
*/
public static void setupKDCAndPrincipals() throws Exception {
// set up the KDC
File target = new File(System.getProperty("test.dir", "target"));
kdcWorkDir = new File(target, "kdc");
kdcWorkDir.mkdirs();
if (!kdcWorkDir.mkdirs()) {
assertTrue(kdcWorkDir.isDirectory());
}
Properties kdcConf = MiniKdc.createConf();
kdcConf.setProperty(MiniKdc.DEBUG, "true");
kdc = new MiniKdc(kdcConf, kdcWorkDir);
kdc.start();
routerKeytab = createKeytab(GPG, "gpg.keytab");
}
/**
* Create the keytab for the given principal, includes
* raw principal and $principal/localhost.
*
* @param principal principal short name.
* @param filename filename of keytab.
* @return file of keytab.
* @throws Exception an error occurred.
*/
public static File createKeytab(String principal, String filename) throws Exception {
assertTrue("empty principal", StringUtils.isNotBlank(principal));
assertTrue("empty host", StringUtils.isNotBlank(filename));
assertNotNull("null KDC", kdc);
File keytab = new File(kdcWorkDir, filename);
kdc.createPrincipal(keytab,
principal,
principal + "/localhost",
principal + "/127.0.0.1");
return keytab;
}
/**
* Start the router in safe mode.
*
* @throws Exception an error occurred.
*/
public synchronized void startSecureGPG() {
assertNull("GPG is already running", gpg);
MemoryFederationStateStore stateStore = new MemoryFederationStateStore();
stateStore.init(conf);
FederationStateStoreFacade.getInstance().reinitialize(stateStore, conf);
UserGroupInformation.setConfiguration(conf);
gpg = new GlobalPolicyGenerator();
gpg.init(conf);
gpg.start();
}
/**
* Shut down the KDC service.
*
* @throws Exception an error occurred.
*/
public static void teardownKDC() throws Exception {
if (kdc != null) {
kdc.stop();
kdc = null;
}
}
public GlobalPolicyGenerator getGpg() {
return gpg;
}
public static MiniKdc getKdc() {
return kdc;
}
/**
* Stop the router in safe mode.
*
* @throws Exception an error occurred.
*/
protected synchronized void stopSecureRouter() throws Exception {
if (gpg != null) {
gpg.stop();
gpg = null;
}
}
}

View File

@ -0,0 +1,50 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.globalpolicygenerator.secure;
import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContext;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TestGpgSecureLogins extends AbstractGlobalPolicyGeneratorTest {
private static final Logger LOG = LoggerFactory.getLogger(TestGpgSecureLogins.class);
@Test
public void testHasRealm() throws Throwable {
Assert.assertNotNull(getRealm());
LOG.info("Router principal = {}", getPrincipalAndRealm(GPG_LOCALHOST));
}
@Test
public void testRouterSecureLogin() throws Exception {
startSecureGPG();
GPGContext gpgContext = this.getGpg().getGPGContext();
Assert.assertNotNull(gpgContext);
stopSecureRouter();
}
public static String getPrincipalAndRealm(String principal) {
return principal + "@" + getRealm();
}
protected static String getRealm() {
return getKdc().getRealm();
}
}