HDFS-9780. RollingFileSystemSink doesn't work on secure clusters. (Daniel Templeton via kasha)
This commit is contained in:
parent
c7fcec24b8
commit
33ce7f6c07
@ -47,6 +47,8 @@
|
||||
import org.apache.hadoop.metrics2.MetricsRecord;
|
||||
import org.apache.hadoop.metrics2.MetricsSink;
|
||||
import org.apache.hadoop.metrics2.MetricsTag;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
|
||||
/**
|
||||
* <p>This class is a metrics sink that uses
|
||||
@ -107,6 +109,14 @@
|
||||
* the data is being written successfully. This is a known HDFS limitation that
|
||||
* exists because of the performance cost of updating the metadata. See
|
||||
* <a href="https://issues.apache.org/jira/browse/HDFS-5478">HDFS-5478</a>.</p>
|
||||
*
|
||||
* <p>When using this sink in a secure (Kerberos) environment, two additional
|
||||
* properties must be set: <code>keytab-key</code> and
|
||||
* <code>principal-key</code>. <code>keytab-key</code> should contain the key by
|
||||
* which the keytab file can be found in the configuration, for example,
|
||||
* <code>yarn.nodemanager.keytab</code>. <code>principal-key</code> should
|
||||
* contain the key by which the principal can be found in the configuration,
|
||||
* for example, <code>yarn.nodemanager.principal</code>.
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Evolving
|
||||
@ -115,6 +125,8 @@ public class RollingFileSystemSink implements MetricsSink, Closeable {
|
||||
private static final String SOURCE_KEY = "source";
|
||||
private static final String IGNORE_ERROR_KEY = "ignore-error";
|
||||
private static final String ALLOW_APPEND_KEY = "allow-append";
|
||||
private static final String KEYTAB_PROPERTY_KEY = "keytab-key";
|
||||
private static final String USERNAME_PROPERTY_KEY = "principal-key";
|
||||
private static final String SOURCE_DEFAULT = "unknown";
|
||||
private static final String BASEPATH_DEFAULT = "/tmp";
|
||||
private static final FastDateFormat DATE_FORMAT =
|
||||
@ -134,10 +146,21 @@ public class RollingFileSystemSink implements MetricsSink, Closeable {
|
||||
// We keep this only to be able to call hsynch() on it.
|
||||
private FSDataOutputStream currentFSOutStream;
|
||||
private Timer flushTimer;
|
||||
|
||||
// This flag is used during testing to make the flusher thread run after only
|
||||
// a short pause instead of waiting for the top of the hour.
|
||||
@VisibleForTesting
|
||||
protected static boolean isTest = false;
|
||||
protected static boolean flushQuickly = false;
|
||||
// This flag is used by the flusher thread to indicate that it has run. Used
|
||||
// only for testing purposes.
|
||||
@VisibleForTesting
|
||||
protected static volatile boolean hasFlushed = false;
|
||||
// Use this configuration instead of loading a new one.
|
||||
@VisibleForTesting
|
||||
protected static Configuration suppliedConf = null;
|
||||
// Use this file system instead of getting a new one.
|
||||
@VisibleForTesting
|
||||
protected static FileSystem suppliedFilesystem = null;
|
||||
|
||||
@Override
|
||||
public void init(SubsetConfiguration conf) {
|
||||
@ -146,15 +169,49 @@ public void init(SubsetConfiguration conf) {
|
||||
ignoreError = conf.getBoolean(IGNORE_ERROR_KEY, false);
|
||||
allowAppend = conf.getBoolean(ALLOW_APPEND_KEY, false);
|
||||
|
||||
Configuration configuration = loadConf();
|
||||
|
||||
UserGroupInformation.setConfiguration(configuration);
|
||||
|
||||
// Don't do secure setup if it's not needed.
|
||||
if (UserGroupInformation.isSecurityEnabled()) {
|
||||
// Validate config so that we don't get an NPE
|
||||
checkForProperty(conf, KEYTAB_PROPERTY_KEY);
|
||||
checkForProperty(conf, USERNAME_PROPERTY_KEY);
|
||||
|
||||
|
||||
try {
|
||||
// Login as whoever we're supposed to be and let the hostname be pulled
|
||||
// from localhost. If security isn't enabled, this does nothing.
|
||||
SecurityUtil.login(configuration, conf.getString(KEYTAB_PROPERTY_KEY),
|
||||
conf.getString(USERNAME_PROPERTY_KEY));
|
||||
} catch (IOException ex) {
|
||||
throw new MetricsException("Error logging in securely: ["
|
||||
+ ex.toString() + "]", ex);
|
||||
}
|
||||
}
|
||||
|
||||
fileSystem = getFileSystem(configuration);
|
||||
|
||||
// This step isn't strictly necessary, but it makes debugging issues much
|
||||
// easier. We try to create the base directory eagerly and fail with
|
||||
// copious debug info if it fails.
|
||||
try {
|
||||
fileSystem = FileSystem.get(new URI(basePath.toString()),
|
||||
new Configuration());
|
||||
} catch (URISyntaxException ex) {
|
||||
throw new MetricsException("The supplied filesystem base path URI"
|
||||
+ " is not a valid URI: " + basePath.toString(), ex);
|
||||
} catch (IOException ex) {
|
||||
throw new MetricsException("Error connecting to file system: "
|
||||
+ basePath + " [" + ex.toString() + "]", ex);
|
||||
fileSystem.mkdirs(basePath);
|
||||
} catch (Exception ex) {
|
||||
throw new MetricsException("Failed to create " + basePath + "["
|
||||
+ SOURCE_KEY + "=" + source + ", "
|
||||
+ IGNORE_ERROR_KEY + "=" + ignoreError + ", "
|
||||
+ ALLOW_APPEND_KEY + "=" + allowAppend + ", "
|
||||
+ KEYTAB_PROPERTY_KEY + "="
|
||||
+ conf.getString(KEYTAB_PROPERTY_KEY) + ", "
|
||||
+ conf.getString(KEYTAB_PROPERTY_KEY) + "="
|
||||
+ configuration.get(conf.getString(KEYTAB_PROPERTY_KEY)) + ", "
|
||||
+ USERNAME_PROPERTY_KEY + "="
|
||||
+ conf.getString(USERNAME_PROPERTY_KEY) + ", "
|
||||
+ conf.getString(USERNAME_PROPERTY_KEY) + "="
|
||||
+ configuration.get(conf.getString(USERNAME_PROPERTY_KEY))
|
||||
+ "] -- " + ex.toString(), ex);
|
||||
}
|
||||
|
||||
// If we're permitted to append, check if we actually can
|
||||
@ -165,6 +222,67 @@ public void init(SubsetConfiguration conf) {
|
||||
flushTimer = new Timer("RollingFileSystemSink Flusher", true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Throw a {@link MetricsException} if the given property is not set.
|
||||
*
|
||||
* @param conf the configuration to test
|
||||
* @param key the key to validate
|
||||
*/
|
||||
private static void checkForProperty(SubsetConfiguration conf, String key) {
|
||||
if (!conf.containsKey(key)) {
|
||||
throw new MetricsException("Configuration is missing " + key
|
||||
+ " property");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the supplied configuration for testing or otherwise load a new
|
||||
* configuration.
|
||||
*
|
||||
* @return the configuration to use
|
||||
*/
|
||||
private Configuration loadConf() {
|
||||
Configuration conf;
|
||||
|
||||
if (suppliedConf != null) {
|
||||
conf = suppliedConf;
|
||||
} else {
|
||||
// The config we're handed in init() isn't the one we want here, so we
|
||||
// create a new one to pick up the full settings.
|
||||
conf = new Configuration();
|
||||
}
|
||||
|
||||
return conf;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the supplied file system for testing or otherwise get a new file
|
||||
* system.
|
||||
*
|
||||
* @param conf the configuration
|
||||
* @return the file system to use
|
||||
* @throws MetricsException thrown if the file system could not be retrieved
|
||||
*/
|
||||
private FileSystem getFileSystem(Configuration conf) throws MetricsException {
|
||||
FileSystem fs = null;
|
||||
|
||||
if (suppliedFilesystem != null) {
|
||||
fs = suppliedFilesystem;
|
||||
} else {
|
||||
try {
|
||||
fs = FileSystem.get(new URI(basePath.toString()), conf);
|
||||
} catch (URISyntaxException ex) {
|
||||
throw new MetricsException("The supplied filesystem base path URI"
|
||||
+ " is not a valid URI: " + basePath.toString(), ex);
|
||||
} catch (IOException ex) {
|
||||
throw new MetricsException("Error connecting to file system: "
|
||||
+ basePath + " [" + ex.toString() + "]", ex);
|
||||
}
|
||||
}
|
||||
|
||||
return fs;
|
||||
}
|
||||
|
||||
/**
|
||||
* Test whether the file system supports append and return the answer.
|
||||
* @param fs the target file system
|
||||
@ -232,7 +350,7 @@ private void scheduleFlush(Date now) {
|
||||
|
||||
next.setTime(now);
|
||||
|
||||
if (isTest) {
|
||||
if (flushQuickly) {
|
||||
// If we're running unit tests, flush after a short pause
|
||||
next.add(Calendar.MILLISECOND, 400);
|
||||
} else {
|
||||
|
@ -32,18 +32,18 @@
|
||||
import java.util.Date;
|
||||
import java.util.TimeZone;
|
||||
import java.util.regex.Pattern;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.commons.configuration.SubsetConfiguration;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.fs.LocatedFileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.RemoteIterator;
|
||||
import org.apache.hadoop.metrics2.MetricsException;
|
||||
import org.apache.hadoop.metrics2.MetricsRecord;
|
||||
|
||||
import org.apache.hadoop.metrics2.MetricsSystem;
|
||||
import org.apache.hadoop.metrics2.annotation.Metric;
|
||||
import org.apache.hadoop.metrics2.annotation.Metrics;
|
||||
@ -66,9 +66,11 @@
|
||||
* methods for classes that extend it.
|
||||
*/
|
||||
public class RollingFileSystemSinkTestBase {
|
||||
protected static final String SINK_PRINCIPAL_KEY = "rfssink.principal";
|
||||
protected static final String SINK_KEYTAB_FILE_KEY = "rfssink.keytab";
|
||||
protected static final File ROOT_TEST_DIR =
|
||||
new File(System.getProperty("test.build.data", "target/"),
|
||||
"FileSystemSinkTest");
|
||||
new File(System.getProperty("test.build.data", "target/test"),
|
||||
"RollingFileSystemSinkTest");
|
||||
protected static final SimpleDateFormat DATE_FORMAT =
|
||||
new SimpleDateFormat("yyyyMMddHH");
|
||||
protected static File methodDir;
|
||||
@ -118,8 +120,9 @@ public MyMetrics2 registerWith(MetricsSystem ms) {
|
||||
* Set the date format's timezone to GMT.
|
||||
*/
|
||||
@BeforeClass
|
||||
public static void setTZ() {
|
||||
public static void setup() {
|
||||
DATE_FORMAT.setTimeZone(TimeZone.getTimeZone("GMT"));
|
||||
FileUtil.fullyDelete(ROOT_TEST_DIR);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -128,7 +131,7 @@ public static void setTZ() {
|
||||
*/
|
||||
@AfterClass
|
||||
public static void deleteBaseDir() throws IOException {
|
||||
FileUtils.deleteDirectory(ROOT_TEST_DIR);
|
||||
FileUtil.fullyDelete(ROOT_TEST_DIR);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -139,11 +142,14 @@ public static void deleteBaseDir() throws IOException {
|
||||
public void createMethodDir() throws IOException {
|
||||
methodDir = new File(ROOT_TEST_DIR, methodName.getMethodName());
|
||||
|
||||
methodDir.mkdirs();
|
||||
assertTrue("Test directory already exists: " + methodDir,
|
||||
methodDir.mkdirs());
|
||||
}
|
||||
|
||||
/**
|
||||
* Set up the metrics system, start it, and return it.
|
||||
* Set up the metrics system, start it, and return it. The principal and
|
||||
* keytab properties will not be set.
|
||||
*
|
||||
* @param path the base path for the sink
|
||||
* @param ignoreErrors whether the sink should ignore errors
|
||||
* @param allowAppend whether the sink is allowed to append to existing files
|
||||
@ -151,18 +157,37 @@ public void createMethodDir() throws IOException {
|
||||
*/
|
||||
protected MetricsSystem initMetricsSystem(String path, boolean ignoreErrors,
|
||||
boolean allowAppend) {
|
||||
return initMetricsSystem(path, ignoreErrors, allowAppend, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Set up the metrics system, start it, and return it.
|
||||
* @param path the base path for the sink
|
||||
* @param ignoreErrors whether the sink should ignore errors
|
||||
* @param allowAppend whether the sink is allowed to append to existing files
|
||||
* @param useSecureParams whether to set the principal and keytab properties
|
||||
* @return the org.apache.hadoop.metrics2.MetricsSystem
|
||||
*/
|
||||
protected MetricsSystem initMetricsSystem(String path, boolean ignoreErrors,
|
||||
boolean allowAppend, boolean useSecureParams) {
|
||||
// If the prefix is not lower case, the metrics system won't be able to
|
||||
// read any of the properties.
|
||||
final String prefix = methodName.getMethodName().toLowerCase();
|
||||
String prefix = methodName.getMethodName().toLowerCase();
|
||||
|
||||
new ConfigBuilder().add("*.period", 10000)
|
||||
ConfigBuilder builder = new ConfigBuilder().add("*.period", 10000)
|
||||
.add(prefix + ".sink.mysink0.class", ErrorSink.class.getName())
|
||||
.add(prefix + ".sink.mysink0.basepath", path)
|
||||
.add(prefix + ".sink.mysink0.source", "testsrc")
|
||||
.add(prefix + ".sink.mysink0.context", "test1")
|
||||
.add(prefix + ".sink.mysink0.ignore-error", ignoreErrors)
|
||||
.add(prefix + ".sink.mysink0.allow-append", allowAppend)
|
||||
.save(TestMetricsConfig.getTestFilename("hadoop-metrics2-" + prefix));
|
||||
.add(prefix + ".sink.mysink0.allow-append", allowAppend);
|
||||
|
||||
if (useSecureParams) {
|
||||
builder.add(prefix + ".sink.mysink0.keytab-key", SINK_KEYTAB_FILE_KEY)
|
||||
.add(prefix + ".sink.mysink0.principal-key", SINK_PRINCIPAL_KEY);
|
||||
}
|
||||
|
||||
builder.save(TestMetricsConfig.getTestFilename("hadoop-metrics2-" + prefix));
|
||||
|
||||
MetricsSystemImpl ms = new MetricsSystemImpl(prefix);
|
||||
|
||||
@ -481,6 +506,17 @@ public void assertFileCount(FileSystem fs, Path dir, int expected)
|
||||
public static class ErrorSink extends RollingFileSystemSink {
|
||||
public static volatile boolean errored = false;
|
||||
|
||||
@Override
|
||||
public void init(SubsetConfiguration conf) {
|
||||
try {
|
||||
super.init(conf);
|
||||
} catch (MetricsException ex) {
|
||||
errored = true;
|
||||
|
||||
throw new MetricsException(ex);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void putMetrics(MetricsRecord record) {
|
||||
try {
|
||||
|
@ -996,6 +996,9 @@ Release 2.9.0 - UNRELEASED
|
||||
|
||||
HDFS-9637. Tests for RollingFileSystemSink. (Daniel Templeton via kasha)
|
||||
|
||||
HDFS-9780. RollingFileSystemSink doesn't work on secure clusters.
|
||||
(Daniel Templeton via kasha)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
@ -20,6 +20,8 @@
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import java.util.Calendar;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
@ -28,11 +30,9 @@
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.metrics2.MetricsSystem;
|
||||
import org.apache.hadoop.metrics2.sink.RollingFileSystemSinkTestBase.MyMetrics1;
|
||||
import org.junit.After;
|
||||
import org.junit.Test;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import org.junit.Before;
|
||||
|
||||
/**
|
||||
* Test the {@link RollingFileSystemSink} class in the context of HDFS.
|
||||
@ -58,7 +58,7 @@ public void setupHdfs() throws IOException {
|
||||
new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build();
|
||||
|
||||
// Also clear sink flags
|
||||
RollingFileSystemSink.isTest = false;
|
||||
RollingFileSystemSink.flushQuickly = false;
|
||||
RollingFileSystemSink.hasFlushed = false;
|
||||
}
|
||||
|
||||
@ -251,8 +251,7 @@ public void testSilentFailedClose() throws IOException {
|
||||
*/
|
||||
@Test
|
||||
public void testFlushThread() throws Exception {
|
||||
RollingFileSystemSink.isTest = true;
|
||||
RollingFileSystemSink.hasFlushed = false;
|
||||
RollingFileSystemSink.flushQuickly = true;
|
||||
|
||||
String path = "hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp";
|
||||
MetricsSystem ms = initMetricsSystem(path, true, false);
|
||||
|
@ -0,0 +1,283 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.metrics2.sink;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.Properties;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HTTP_POLICY_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
|
||||
import org.apache.hadoop.http.HttpConfig;
|
||||
import org.apache.hadoop.metrics2.MetricsSystem;
|
||||
import org.apache.hadoop.minikdc.MiniKdc;
|
||||
import org.apache.hadoop.security.NullGroupsMapping;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
|
||||
import org.junit.Test;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
/**
|
||||
* Test the {@link RollingFileSystemSink} class in the context of HDFS with
|
||||
* Kerberos enabled.
|
||||
*/
|
||||
public class TestRollingFileSystemSinkWithSecureHdfs
|
||||
extends RollingFileSystemSinkTestBase {
|
||||
private static final int NUM_DATANODES = 4;
|
||||
private static MiniKdc kdc;
|
||||
private static String sinkPrincipal;
|
||||
private static String sinkKeytab;
|
||||
private static String hdfsPrincipal;
|
||||
private static String hdfsKeytab;
|
||||
private static String spnegoPrincipal;
|
||||
|
||||
/**
|
||||
* Do a basic write test against an HDFS cluster with Kerberos enabled. We
|
||||
* assume that if a basic write succeeds, more complex operations will also
|
||||
* succeed.
|
||||
*
|
||||
* @throws Exception thrown if things break
|
||||
*/
|
||||
@Test
|
||||
public void testWithSecureHDFS() throws Exception {
|
||||
RollingFileSystemSink.flushQuickly = false;
|
||||
RollingFileSystemSink.hasFlushed = false;
|
||||
initKdc();
|
||||
|
||||
MiniDFSCluster cluster = null;
|
||||
|
||||
try {
|
||||
HdfsConfiguration conf = createSecureConfig("authentication,privacy");
|
||||
|
||||
RollingFileSystemSink.suppliedConf = conf;
|
||||
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES)
|
||||
.build();
|
||||
|
||||
cluster.waitActive();
|
||||
|
||||
UserGroupInformation sink = createDirectoriesSecurely(cluster);
|
||||
final String path =
|
||||
"hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp/test";
|
||||
final MetricsSystem ms = initMetricsSystem(path, true, false, true);
|
||||
|
||||
assertMetricsContents(
|
||||
sink.doAs(new PrivilegedExceptionAction<String>() {
|
||||
@Override
|
||||
public String run() throws Exception {
|
||||
return doWriteTest(ms, path, 1);
|
||||
}
|
||||
}));
|
||||
} finally {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
|
||||
shutdownKdc();
|
||||
|
||||
// Restore non-secure conf
|
||||
UserGroupInformation.setConfiguration(new Configuration());
|
||||
RollingFileSystemSink.suppliedConf = null;
|
||||
RollingFileSystemSink.suppliedFilesystem = null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Do a basic write test against an HDFS cluster with Kerberos enabled but
|
||||
* without the principal and keytab properties set.
|
||||
*
|
||||
* @throws Exception thrown if things break
|
||||
*/
|
||||
@Test
|
||||
public void testMissingPropertiesWithSecureHDFS() throws Exception {
|
||||
RollingFileSystemSink.flushQuickly = false;
|
||||
RollingFileSystemSink.hasFlushed = false;
|
||||
initKdc();
|
||||
|
||||
MiniDFSCluster cluster = null;
|
||||
|
||||
try {
|
||||
HdfsConfiguration conf = createSecureConfig("authentication,privacy");
|
||||
|
||||
RollingFileSystemSink.suppliedConf = conf;
|
||||
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES)
|
||||
.build();
|
||||
|
||||
final String path =
|
||||
"hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp/test";
|
||||
|
||||
createDirectoriesSecurely(cluster);
|
||||
initMetricsSystem(path, true, false);
|
||||
|
||||
assertTrue("No exception was generated initializing the sink against a "
|
||||
+ "secure cluster even though the principal and keytab properties "
|
||||
+ "were missing", ErrorSink.errored);
|
||||
} finally {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
|
||||
shutdownKdc();
|
||||
|
||||
// Restore non-secure conf
|
||||
UserGroupInformation.setConfiguration(new Configuration());
|
||||
RollingFileSystemSink.suppliedConf = null;
|
||||
RollingFileSystemSink.suppliedFilesystem = null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create the /tmp directory as <i>hdfs</i> and /tmp/test as <i>sink</i> and
|
||||
* return the UGI for <i>sink</i>.
|
||||
*
|
||||
* @param cluster the mini-cluster
|
||||
* @return the UGI for <i>sink</i>
|
||||
* @throws IOException thrown if login or directory creation fails
|
||||
* @throws InterruptedException thrown if interrupted while creating a
|
||||
* file system handle
|
||||
*/
|
||||
protected UserGroupInformation createDirectoriesSecurely(final MiniDFSCluster cluster)
|
||||
throws IOException, InterruptedException {
|
||||
Path tmp = new Path("/tmp");
|
||||
Path test = new Path(tmp, "test");
|
||||
|
||||
UserGroupInformation hdfs =
|
||||
UserGroupInformation.loginUserFromKeytabAndReturnUGI(hdfsPrincipal,
|
||||
hdfsKeytab);
|
||||
FileSystem fsForSuperUser =
|
||||
hdfs.doAs(new PrivilegedExceptionAction<FileSystem>() {
|
||||
@Override
|
||||
public FileSystem run() throws Exception {
|
||||
return cluster.getFileSystem();
|
||||
}
|
||||
});
|
||||
|
||||
fsForSuperUser.mkdirs(tmp);
|
||||
fsForSuperUser.setPermission(tmp, new FsPermission((short)0777));
|
||||
|
||||
UserGroupInformation sink =
|
||||
UserGroupInformation.loginUserFromKeytabAndReturnUGI(sinkPrincipal,
|
||||
sinkKeytab);
|
||||
FileSystem fsForSink =
|
||||
sink.doAs(new PrivilegedExceptionAction<FileSystem>() {
|
||||
@Override
|
||||
public FileSystem run() throws Exception {
|
||||
return cluster.getFileSystem();
|
||||
}
|
||||
});
|
||||
|
||||
fsForSink.mkdirs(test);
|
||||
RollingFileSystemSink.suppliedFilesystem = fsForSink;
|
||||
|
||||
return sink;
|
||||
}
|
||||
|
||||
/**
|
||||
* Setup the KDC for testing a secure HDFS cluster
|
||||
*
|
||||
* @throws Exception thrown if the KDC setup fails
|
||||
*/
|
||||
public static void initKdc() throws Exception {
|
||||
Properties kdcConf = MiniKdc.createConf();
|
||||
kdc = new MiniKdc(kdcConf, methodDir);
|
||||
kdc.start();
|
||||
|
||||
File sinkKeytabFile = new File(methodDir, "sink.keytab");
|
||||
sinkKeytab = sinkKeytabFile.getAbsolutePath();
|
||||
kdc.createPrincipal(sinkKeytabFile, "sink/localhost");
|
||||
sinkPrincipal = "sink/localhost@" + kdc.getRealm();
|
||||
|
||||
File hdfsKeytabFile = new File(methodDir, "hdfs.keytab");
|
||||
hdfsKeytab = hdfsKeytabFile.getAbsolutePath();
|
||||
kdc.createPrincipal(hdfsKeytabFile, "hdfs/localhost",
|
||||
"HTTP/localhost");
|
||||
hdfsPrincipal = "hdfs/localhost@" + kdc.getRealm();
|
||||
spnegoPrincipal = "HTTP/localhost@" + kdc.getRealm();
|
||||
}
|
||||
|
||||
/**
|
||||
* Stop the mini-KDC.
|
||||
*/
|
||||
public static void shutdownKdc() {
|
||||
if (kdc != null) {
|
||||
kdc.stop();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a configuration for starting a secure cluster.
|
||||
*
|
||||
* @param dataTransferProtection supported QOPs
|
||||
* @return configuration for starting a secure cluster
|
||||
* @throws Exception if there is any failure
|
||||
*/
|
||||
protected HdfsConfiguration createSecureConfig(
|
||||
String dataTransferProtection) throws Exception {
|
||||
HdfsConfiguration conf = new HdfsConfiguration();
|
||||
|
||||
SecurityUtil.setAuthenticationMethod(
|
||||
UserGroupInformation.AuthenticationMethod.KERBEROS, conf);
|
||||
conf.set(DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, hdfsPrincipal);
|
||||
conf.set(DFS_NAMENODE_KEYTAB_FILE_KEY, hdfsKeytab);
|
||||
conf.set(DFS_DATANODE_KERBEROS_PRINCIPAL_KEY, hdfsPrincipal);
|
||||
conf.set(DFS_DATANODE_KEYTAB_FILE_KEY, hdfsKeytab);
|
||||
conf.set(SINK_PRINCIPAL_KEY, sinkPrincipal);
|
||||
conf.set(SINK_KEYTAB_FILE_KEY, sinkKeytab);
|
||||
conf.set(DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY, spnegoPrincipal);
|
||||
conf.setBoolean(DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
|
||||
conf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, dataTransferProtection);
|
||||
conf.set(DFS_HTTP_POLICY_KEY, HttpConfig.Policy.HTTPS_ONLY.name());
|
||||
conf.set(DFS_NAMENODE_HTTPS_ADDRESS_KEY, "localhost:0");
|
||||
conf.set(DFS_DATANODE_HTTPS_ADDRESS_KEY, "localhost:0");
|
||||
conf.setInt(IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY, 10);
|
||||
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_GROUP_MAPPING,
|
||||
NullGroupsMapping.class.getName());
|
||||
|
||||
String keystoresDir = methodDir.getAbsolutePath();
|
||||
String sslConfDir = KeyStoreTestUtil.getClasspathDir(this.getClass());
|
||||
|
||||
KeyStoreTestUtil.setupSSLConfig(keystoresDir, sslConfDir, conf, false);
|
||||
conf.set(DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_KEY,
|
||||
KeyStoreTestUtil.getClientSSLConfigFileName());
|
||||
conf.set(DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_KEY,
|
||||
KeyStoreTestUtil.getServerSSLConfigFileName());
|
||||
|
||||
return conf;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user