From 33ce7f6c072144f55be30c66099eef5bc736405e Mon Sep 17 00:00:00 2001 From: Karthik Kambatla Date: Thu, 11 Feb 2016 20:37:03 -0800 Subject: [PATCH] HDFS-9780. RollingFileSystemSink doesn't work on secure clusters. (Daniel Templeton via kasha) --- .../metrics2/sink/RollingFileSystemSink.java | 138 ++++++++- .../sink/RollingFileSystemSinkTestBase.java | 60 +++- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../TestRollingFileSystemSinkWithHdfs.java | 9 +- ...stRollingFileSystemSinkWithSecureHdfs.java | 283 ++++++++++++++++++ 5 files changed, 466 insertions(+), 27 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSinkWithSecureHdfs.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSink.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSink.java index de403cab30..2c0a26a43d 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSink.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSink.java @@ -47,6 +47,8 @@ import org.apache.hadoop.metrics2.MetricsException; import org.apache.hadoop.metrics2.MetricsRecord; import org.apache.hadoop.metrics2.MetricsSink; import org.apache.hadoop.metrics2.MetricsTag; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.UserGroupInformation; /** *

This class is a metrics sink that uses @@ -107,6 +109,14 @@ import org.apache.hadoop.metrics2.MetricsTag; * the data is being written successfully. This is a known HDFS limitation that * exists because of the performance cost of updating the metadata. See * HDFS-5478.

+ * + *

When using this sink in a secure (Kerberos) environment, two additional + * properties must be set: keytab-key and + * principal-key. keytab-key should contain the key by + * which the keytab file can be found in the configuration, for example, + * yarn.nodemanager.keytab. principal-key should + * contain the key by which the principal can be found in the configuration, + * for example, yarn.nodemanager.principal. */ @InterfaceAudience.Public @InterfaceStability.Evolving @@ -115,6 +125,8 @@ public class RollingFileSystemSink implements MetricsSink, Closeable { private static final String SOURCE_KEY = "source"; private static final String IGNORE_ERROR_KEY = "ignore-error"; private static final String ALLOW_APPEND_KEY = "allow-append"; + private static final String KEYTAB_PROPERTY_KEY = "keytab-key"; + private static final String USERNAME_PROPERTY_KEY = "principal-key"; private static final String SOURCE_DEFAULT = "unknown"; private static final String BASEPATH_DEFAULT = "/tmp"; private static final FastDateFormat DATE_FORMAT = @@ -134,10 +146,21 @@ public class RollingFileSystemSink implements MetricsSink, Closeable { // We keep this only to be able to call hsynch() on it. private FSDataOutputStream currentFSOutStream; private Timer flushTimer; + + // This flag is used during testing to make the flusher thread run after only + // a short pause instead of waiting for the top of the hour. @VisibleForTesting - protected static boolean isTest = false; + protected static boolean flushQuickly = false; + // This flag is used by the flusher thread to indicate that it has run. Used + // only for testing purposes. @VisibleForTesting protected static volatile boolean hasFlushed = false; + // Use this configuration instead of loading a new one. + @VisibleForTesting + protected static Configuration suppliedConf = null; + // Use this file system instead of getting a new one. + @VisibleForTesting + protected static FileSystem suppliedFilesystem = null; @Override public void init(SubsetConfiguration conf) { @@ -146,15 +169,49 @@ public class RollingFileSystemSink implements MetricsSink, Closeable { ignoreError = conf.getBoolean(IGNORE_ERROR_KEY, false); allowAppend = conf.getBoolean(ALLOW_APPEND_KEY, false); + Configuration configuration = loadConf(); + + UserGroupInformation.setConfiguration(configuration); + + // Don't do secure setup if it's not needed. + if (UserGroupInformation.isSecurityEnabled()) { + // Validate config so that we don't get an NPE + checkForProperty(conf, KEYTAB_PROPERTY_KEY); + checkForProperty(conf, USERNAME_PROPERTY_KEY); + + + try { + // Login as whoever we're supposed to be and let the hostname be pulled + // from localhost. If security isn't enabled, this does nothing. + SecurityUtil.login(configuration, conf.getString(KEYTAB_PROPERTY_KEY), + conf.getString(USERNAME_PROPERTY_KEY)); + } catch (IOException ex) { + throw new MetricsException("Error logging in securely: [" + + ex.toString() + "]", ex); + } + } + + fileSystem = getFileSystem(configuration); + + // This step isn't strictly necessary, but it makes debugging issues much + // easier. We try to create the base directory eagerly and fail with + // copious debug info if it fails. try { - fileSystem = FileSystem.get(new URI(basePath.toString()), - new Configuration()); - } catch (URISyntaxException ex) { - throw new MetricsException("The supplied filesystem base path URI" - + " is not a valid URI: " + basePath.toString(), ex); - } catch (IOException ex) { - throw new MetricsException("Error connecting to file system: " - + basePath + " [" + ex.toString() + "]", ex); + fileSystem.mkdirs(basePath); + } catch (Exception ex) { + throw new MetricsException("Failed to create " + basePath + "[" + + SOURCE_KEY + "=" + source + ", " + + IGNORE_ERROR_KEY + "=" + ignoreError + ", " + + ALLOW_APPEND_KEY + "=" + allowAppend + ", " + + KEYTAB_PROPERTY_KEY + "=" + + conf.getString(KEYTAB_PROPERTY_KEY) + ", " + + conf.getString(KEYTAB_PROPERTY_KEY) + "=" + + configuration.get(conf.getString(KEYTAB_PROPERTY_KEY)) + ", " + + USERNAME_PROPERTY_KEY + "=" + + conf.getString(USERNAME_PROPERTY_KEY) + ", " + + conf.getString(USERNAME_PROPERTY_KEY) + "=" + + configuration.get(conf.getString(USERNAME_PROPERTY_KEY)) + + "] -- " + ex.toString(), ex); } // If we're permitted to append, check if we actually can @@ -165,6 +222,67 @@ public class RollingFileSystemSink implements MetricsSink, Closeable { flushTimer = new Timer("RollingFileSystemSink Flusher", true); } + /** + * Throw a {@link MetricsException} if the given property is not set. + * + * @param conf the configuration to test + * @param key the key to validate + */ + private static void checkForProperty(SubsetConfiguration conf, String key) { + if (!conf.containsKey(key)) { + throw new MetricsException("Configuration is missing " + key + + " property"); + } + } + + /** + * Return the supplied configuration for testing or otherwise load a new + * configuration. + * + * @return the configuration to use + */ + private Configuration loadConf() { + Configuration conf; + + if (suppliedConf != null) { + conf = suppliedConf; + } else { + // The config we're handed in init() isn't the one we want here, so we + // create a new one to pick up the full settings. + conf = new Configuration(); + } + + return conf; + } + + /** + * Return the supplied file system for testing or otherwise get a new file + * system. + * + * @param conf the configuration + * @return the file system to use + * @throws MetricsException thrown if the file system could not be retrieved + */ + private FileSystem getFileSystem(Configuration conf) throws MetricsException { + FileSystem fs = null; + + if (suppliedFilesystem != null) { + fs = suppliedFilesystem; + } else { + try { + fs = FileSystem.get(new URI(basePath.toString()), conf); + } catch (URISyntaxException ex) { + throw new MetricsException("The supplied filesystem base path URI" + + " is not a valid URI: " + basePath.toString(), ex); + } catch (IOException ex) { + throw new MetricsException("Error connecting to file system: " + + basePath + " [" + ex.toString() + "]", ex); + } + } + + return fs; + } + /** * Test whether the file system supports append and return the answer. * @param fs the target file system @@ -232,7 +350,7 @@ public class RollingFileSystemSink implements MetricsSink, Closeable { next.setTime(now); - if (isTest) { + if (flushQuickly) { // If we're running unit tests, flush after a short pause next.add(Calendar.MILLISECOND, 400); } else { diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSinkTestBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSinkTestBase.java index 292d1fcfd2..f1ad058d0a 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSinkTestBase.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSinkTestBase.java @@ -32,18 +32,18 @@ import java.util.Calendar; import java.util.Date; import java.util.TimeZone; import java.util.regex.Pattern; -import org.apache.commons.io.FileUtils; +import org.apache.commons.configuration.SubsetConfiguration; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.metrics2.MetricsException; import org.apache.hadoop.metrics2.MetricsRecord; - import org.apache.hadoop.metrics2.MetricsSystem; import org.apache.hadoop.metrics2.annotation.Metric; import org.apache.hadoop.metrics2.annotation.Metrics; @@ -66,9 +66,11 @@ import static org.junit.Assert.assertTrue; * methods for classes that extend it. */ public class RollingFileSystemSinkTestBase { + protected static final String SINK_PRINCIPAL_KEY = "rfssink.principal"; + protected static final String SINK_KEYTAB_FILE_KEY = "rfssink.keytab"; protected static final File ROOT_TEST_DIR = - new File(System.getProperty("test.build.data", "target/"), - "FileSystemSinkTest"); + new File(System.getProperty("test.build.data", "target/test"), + "RollingFileSystemSinkTest"); protected static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyyMMddHH"); protected static File methodDir; @@ -118,8 +120,9 @@ public class RollingFileSystemSinkTestBase { * Set the date format's timezone to GMT. */ @BeforeClass - public static void setTZ() { + public static void setup() { DATE_FORMAT.setTimeZone(TimeZone.getTimeZone("GMT")); + FileUtil.fullyDelete(ROOT_TEST_DIR); } /** @@ -128,7 +131,7 @@ public class RollingFileSystemSinkTestBase { */ @AfterClass public static void deleteBaseDir() throws IOException { - FileUtils.deleteDirectory(ROOT_TEST_DIR); + FileUtil.fullyDelete(ROOT_TEST_DIR); } /** @@ -139,11 +142,14 @@ public class RollingFileSystemSinkTestBase { public void createMethodDir() throws IOException { methodDir = new File(ROOT_TEST_DIR, methodName.getMethodName()); - methodDir.mkdirs(); + assertTrue("Test directory already exists: " + methodDir, + methodDir.mkdirs()); } /** - * Set up the metrics system, start it, and return it. + * Set up the metrics system, start it, and return it. The principal and + * keytab properties will not be set. + * * @param path the base path for the sink * @param ignoreErrors whether the sink should ignore errors * @param allowAppend whether the sink is allowed to append to existing files @@ -151,18 +157,37 @@ public class RollingFileSystemSinkTestBase { */ protected MetricsSystem initMetricsSystem(String path, boolean ignoreErrors, boolean allowAppend) { + return initMetricsSystem(path, ignoreErrors, allowAppend, false); + } + + /** + * Set up the metrics system, start it, and return it. + * @param path the base path for the sink + * @param ignoreErrors whether the sink should ignore errors + * @param allowAppend whether the sink is allowed to append to existing files + * @param useSecureParams whether to set the principal and keytab properties + * @return the org.apache.hadoop.metrics2.MetricsSystem + */ + protected MetricsSystem initMetricsSystem(String path, boolean ignoreErrors, + boolean allowAppend, boolean useSecureParams) { // If the prefix is not lower case, the metrics system won't be able to // read any of the properties. - final String prefix = methodName.getMethodName().toLowerCase(); + String prefix = methodName.getMethodName().toLowerCase(); - new ConfigBuilder().add("*.period", 10000) + ConfigBuilder builder = new ConfigBuilder().add("*.period", 10000) .add(prefix + ".sink.mysink0.class", ErrorSink.class.getName()) .add(prefix + ".sink.mysink0.basepath", path) .add(prefix + ".sink.mysink0.source", "testsrc") .add(prefix + ".sink.mysink0.context", "test1") .add(prefix + ".sink.mysink0.ignore-error", ignoreErrors) - .add(prefix + ".sink.mysink0.allow-append", allowAppend) - .save(TestMetricsConfig.getTestFilename("hadoop-metrics2-" + prefix)); + .add(prefix + ".sink.mysink0.allow-append", allowAppend); + + if (useSecureParams) { + builder.add(prefix + ".sink.mysink0.keytab-key", SINK_KEYTAB_FILE_KEY) + .add(prefix + ".sink.mysink0.principal-key", SINK_PRINCIPAL_KEY); + } + + builder.save(TestMetricsConfig.getTestFilename("hadoop-metrics2-" + prefix)); MetricsSystemImpl ms = new MetricsSystemImpl(prefix); @@ -481,6 +506,17 @@ public class RollingFileSystemSinkTestBase { public static class ErrorSink extends RollingFileSystemSink { public static volatile boolean errored = false; + @Override + public void init(SubsetConfiguration conf) { + try { + super.init(conf); + } catch (MetricsException ex) { + errored = true; + + throw new MetricsException(ex); + } + } + @Override public void putMetrics(MetricsRecord record) { try { diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index e2463f10bc..73a09d6627 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -996,6 +996,9 @@ Release 2.9.0 - UNRELEASED HDFS-9637. Tests for RollingFileSystemSink. (Daniel Templeton via kasha) + HDFS-9780. RollingFileSystemSink doesn't work on secure clusters. + (Daniel Templeton via kasha) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSinkWithHdfs.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSinkWithHdfs.java index 56ff773652..0f3725b60d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSinkWithHdfs.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSinkWithHdfs.java @@ -20,6 +20,8 @@ package org.apache.hadoop.metrics2.sink; import java.io.IOException; import java.net.URI; +import org.junit.After; +import org.junit.Before; import java.util.Calendar; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -28,11 +30,9 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.metrics2.MetricsSystem; import org.apache.hadoop.metrics2.sink.RollingFileSystemSinkTestBase.MyMetrics1; -import org.junit.After; import org.junit.Test; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -import org.junit.Before; /** * Test the {@link RollingFileSystemSink} class in the context of HDFS. @@ -58,7 +58,7 @@ public class TestRollingFileSystemSinkWithHdfs new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build(); // Also clear sink flags - RollingFileSystemSink.isTest = false; + RollingFileSystemSink.flushQuickly = false; RollingFileSystemSink.hasFlushed = false; } @@ -251,8 +251,7 @@ public class TestRollingFileSystemSinkWithHdfs */ @Test public void testFlushThread() throws Exception { - RollingFileSystemSink.isTest = true; - RollingFileSystemSink.hasFlushed = false; + RollingFileSystemSink.flushQuickly = true; String path = "hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp"; MetricsSystem ms = initMetricsSystem(path, true, false); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSinkWithSecureHdfs.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSinkWithSecureHdfs.java new file mode 100644 index 0000000000..12b71f89a5 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSinkWithSecureHdfs.java @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.metrics2.sink; + +import java.io.File; +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.util.Properties; +import org.apache.hadoop.conf.Configuration; +import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HTTP_POLICY_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY; +import org.apache.hadoop.http.HttpConfig; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.minikdc.MiniKdc; +import org.apache.hadoop.security.NullGroupsMapping; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.ssl.KeyStoreTestUtil; +import org.junit.Test; +import static org.junit.Assert.assertTrue; + +/** + * Test the {@link RollingFileSystemSink} class in the context of HDFS with + * Kerberos enabled. + */ +public class TestRollingFileSystemSinkWithSecureHdfs + extends RollingFileSystemSinkTestBase { + private static final int NUM_DATANODES = 4; + private static MiniKdc kdc; + private static String sinkPrincipal; + private static String sinkKeytab; + private static String hdfsPrincipal; + private static String hdfsKeytab; + private static String spnegoPrincipal; + + /** + * Do a basic write test against an HDFS cluster with Kerberos enabled. We + * assume that if a basic write succeeds, more complex operations will also + * succeed. + * + * @throws Exception thrown if things break + */ + @Test + public void testWithSecureHDFS() throws Exception { + RollingFileSystemSink.flushQuickly = false; + RollingFileSystemSink.hasFlushed = false; + initKdc(); + + MiniDFSCluster cluster = null; + + try { + HdfsConfiguration conf = createSecureConfig("authentication,privacy"); + + RollingFileSystemSink.suppliedConf = conf; + + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES) + .build(); + + cluster.waitActive(); + + UserGroupInformation sink = createDirectoriesSecurely(cluster); + final String path = + "hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp/test"; + final MetricsSystem ms = initMetricsSystem(path, true, false, true); + + assertMetricsContents( + sink.doAs(new PrivilegedExceptionAction() { + @Override + public String run() throws Exception { + return doWriteTest(ms, path, 1); + } + })); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + + shutdownKdc(); + + // Restore non-secure conf + UserGroupInformation.setConfiguration(new Configuration()); + RollingFileSystemSink.suppliedConf = null; + RollingFileSystemSink.suppliedFilesystem = null; + } + } + + /** + * Do a basic write test against an HDFS cluster with Kerberos enabled but + * without the principal and keytab properties set. + * + * @throws Exception thrown if things break + */ + @Test + public void testMissingPropertiesWithSecureHDFS() throws Exception { + RollingFileSystemSink.flushQuickly = false; + RollingFileSystemSink.hasFlushed = false; + initKdc(); + + MiniDFSCluster cluster = null; + + try { + HdfsConfiguration conf = createSecureConfig("authentication,privacy"); + + RollingFileSystemSink.suppliedConf = conf; + + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES) + .build(); + + final String path = + "hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp/test"; + + createDirectoriesSecurely(cluster); + initMetricsSystem(path, true, false); + + assertTrue("No exception was generated initializing the sink against a " + + "secure cluster even though the principal and keytab properties " + + "were missing", ErrorSink.errored); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + + shutdownKdc(); + + // Restore non-secure conf + UserGroupInformation.setConfiguration(new Configuration()); + RollingFileSystemSink.suppliedConf = null; + RollingFileSystemSink.suppliedFilesystem = null; + } + } + + /** + * Create the /tmp directory as hdfs and /tmp/test as sink and + * return the UGI for sink. + * + * @param cluster the mini-cluster + * @return the UGI for sink + * @throws IOException thrown if login or directory creation fails + * @throws InterruptedException thrown if interrupted while creating a + * file system handle + */ + protected UserGroupInformation createDirectoriesSecurely(final MiniDFSCluster cluster) + throws IOException, InterruptedException { + Path tmp = new Path("/tmp"); + Path test = new Path(tmp, "test"); + + UserGroupInformation hdfs = + UserGroupInformation.loginUserFromKeytabAndReturnUGI(hdfsPrincipal, + hdfsKeytab); + FileSystem fsForSuperUser = + hdfs.doAs(new PrivilegedExceptionAction() { + @Override + public FileSystem run() throws Exception { + return cluster.getFileSystem(); + } + }); + + fsForSuperUser.mkdirs(tmp); + fsForSuperUser.setPermission(tmp, new FsPermission((short)0777)); + + UserGroupInformation sink = + UserGroupInformation.loginUserFromKeytabAndReturnUGI(sinkPrincipal, + sinkKeytab); + FileSystem fsForSink = + sink.doAs(new PrivilegedExceptionAction() { + @Override + public FileSystem run() throws Exception { + return cluster.getFileSystem(); + } + }); + + fsForSink.mkdirs(test); + RollingFileSystemSink.suppliedFilesystem = fsForSink; + + return sink; + } + + /** + * Setup the KDC for testing a secure HDFS cluster + * + * @throws Exception thrown if the KDC setup fails + */ + public static void initKdc() throws Exception { + Properties kdcConf = MiniKdc.createConf(); + kdc = new MiniKdc(kdcConf, methodDir); + kdc.start(); + + File sinkKeytabFile = new File(methodDir, "sink.keytab"); + sinkKeytab = sinkKeytabFile.getAbsolutePath(); + kdc.createPrincipal(sinkKeytabFile, "sink/localhost"); + sinkPrincipal = "sink/localhost@" + kdc.getRealm(); + + File hdfsKeytabFile = new File(methodDir, "hdfs.keytab"); + hdfsKeytab = hdfsKeytabFile.getAbsolutePath(); + kdc.createPrincipal(hdfsKeytabFile, "hdfs/localhost", + "HTTP/localhost"); + hdfsPrincipal = "hdfs/localhost@" + kdc.getRealm(); + spnegoPrincipal = "HTTP/localhost@" + kdc.getRealm(); + } + + /** + * Stop the mini-KDC. + */ + public static void shutdownKdc() { + if (kdc != null) { + kdc.stop(); + } + } + + /** + * Creates a configuration for starting a secure cluster. + * + * @param dataTransferProtection supported QOPs + * @return configuration for starting a secure cluster + * @throws Exception if there is any failure + */ + protected HdfsConfiguration createSecureConfig( + String dataTransferProtection) throws Exception { + HdfsConfiguration conf = new HdfsConfiguration(); + + SecurityUtil.setAuthenticationMethod( + UserGroupInformation.AuthenticationMethod.KERBEROS, conf); + conf.set(DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, hdfsPrincipal); + conf.set(DFS_NAMENODE_KEYTAB_FILE_KEY, hdfsKeytab); + conf.set(DFS_DATANODE_KERBEROS_PRINCIPAL_KEY, hdfsPrincipal); + conf.set(DFS_DATANODE_KEYTAB_FILE_KEY, hdfsKeytab); + conf.set(SINK_PRINCIPAL_KEY, sinkPrincipal); + conf.set(SINK_KEYTAB_FILE_KEY, sinkKeytab); + conf.set(DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY, spnegoPrincipal); + conf.setBoolean(DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true); + conf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, dataTransferProtection); + conf.set(DFS_HTTP_POLICY_KEY, HttpConfig.Policy.HTTPS_ONLY.name()); + conf.set(DFS_NAMENODE_HTTPS_ADDRESS_KEY, "localhost:0"); + conf.set(DFS_DATANODE_HTTPS_ADDRESS_KEY, "localhost:0"); + conf.setInt(IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY, 10); + conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_GROUP_MAPPING, + NullGroupsMapping.class.getName()); + + String keystoresDir = methodDir.getAbsolutePath(); + String sslConfDir = KeyStoreTestUtil.getClasspathDir(this.getClass()); + + KeyStoreTestUtil.setupSSLConfig(keystoresDir, sslConfDir, conf, false); + conf.set(DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_KEY, + KeyStoreTestUtil.getClientSSLConfigFileName()); + conf.set(DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_KEY, + KeyStoreTestUtil.getServerSSLConfigFileName()); + + return conf; + } +}