From 57c31a3fef625f1ec609d7e8873d4941f7ed5cbc Mon Sep 17 00:00:00 2001
From: Karthik Kambatla This class is a metrics sink that uses
* {@link org.apache.hadoop.fs.FileSystem} to write the metrics logs. Every
- * hour a new directory will be created under the path specified by the
+ * roll interval a new directory will be created under the path specified by the
* basepath
property. All metrics will be logged to a file in the
- * current hour's directory in a file named <hostname>.log, where
+ * current interval's directory in a file named <hostname>.log, where
* <hostname> is the name of the host on which the metrics logging
* process is running. The base path is set by the
* <prefix>.sink.<instance>.basepath
property. The
- * time zone used to create the current hour's directory name is GMT. If the
- * basepath
property isn't specified, it will default to
+ * time zone used to create the current interval's directory name is GMT. If
+ * the basepath
property isn't specified, it will default to
* "/tmp", which is the temp directory on whatever default file
* system is configured for the cluster.true
. When set to
* false
, file errors are quietly swallowed.
The roll-interval
property sets the amount of time before
+ * rolling the directory. The default value is 1 hour. The roll interval may
+ * not be less than 1 minute. The property's value should be given as
+ * number unit, where number is an integer value, and
+ * unit is a valid unit. Valid units are minute, hour,
+ * and day. The units are case insensitive and may be abbreviated or
+ * plural. If no units are specified, hours are assumed. For example,
+ * "2", "2h", "2 hour", and
+ * "2 hours" are all valid ways to specify two hours.
The roll-offset-interval-millis
property sets the upper
+ * bound on a random time interval (in milliseconds) that is used to delay
+ * before the initial roll. All subsequent rolls will happen an integer
+ * number of roll intervals after the initial roll, hence retaining the original
+ * offset. The purpose of this property is to insert some variance in the roll
+ * times so that large clusters using this sink on every node don't cause a
+ * performance impact on HDFS by rolling simultaneously. The default value is
+ * 30000 (30s). When writing to HDFS, as a rule of thumb, the roll offset in
+ * millis should be no less than the number of sink instances times 5.
+ *
*
The primary use of this class is for logging to HDFS. As it uses * {@link org.apache.hadoop.fs.FileSystem} to access the target file system, * however, it can be used to write to the local file system, Amazon S3, or any @@ -79,7 +103,8 @@ *
Not all file systems support the ability to append to files. In file
* systems without the ability to append to files, only one writer can write to
* a file at a time. To allow for concurrent writes from multiple daemons on a
- * single host, the source
property should be set to the name of
+ * single host, the source
property is used to set unique headers
+ * for the log files. The property should be set to the name of
* the source daemon, e.g. namenode. The value of the
* source
property should typically be the same as the property's
* prefix. If this property is not set, the source is taken to be
@@ -105,7 +130,7 @@
* 3.
Note also that when writing to HDFS, the file size information is not - * updated until the file is closed (e.g. at the top of the hour) even though + * updated until the file is closed (at the end of the interval) even though * the data is being written successfully. This is a known HDFS limitation that * exists because of the performance cost of updating the metadata. See * HDFS-5478.
@@ -124,21 +149,32 @@ public class RollingFileSystemSink implements MetricsSink, Closeable { private static final String BASEPATH_KEY = "basepath"; private static final String SOURCE_KEY = "source"; private static final String IGNORE_ERROR_KEY = "ignore-error"; + private static final boolean DEFAULT_IGNORE_ERROR = false; private static final String ALLOW_APPEND_KEY = "allow-append"; + private static final boolean DEFAULT_ALLOW_APPEND = false; private static final String KEYTAB_PROPERTY_KEY = "keytab-key"; private static final String USERNAME_PROPERTY_KEY = "principal-key"; + private static final String ROLL_INTERVAL_KEY = "roll-interval"; + private static final String DEFAULT_ROLL_INTERVAL = "1h"; + private static final String ROLL_OFFSET_INTERVAL_MILLIS_KEY = + "roll-offset-interval-millis"; + private static final int DEFAULT_ROLL_OFFSET_INTERVAL_MILLIS = 30000; private static final String SOURCE_DEFAULT = "unknown"; private static final String BASEPATH_DEFAULT = "/tmp"; private static final FastDateFormat DATE_FORMAT = - FastDateFormat.getInstance("yyyyMMddHH", TimeZone.getTimeZone("GMT")); + FastDateFormat.getInstance("yyyyMMddHHmm", TimeZone.getTimeZone("GMT")); private final Object lock = new Object(); private boolean initialized = false; private SubsetConfiguration properties; private Configuration conf; - private String source; - private boolean ignoreError; - private boolean allowAppend; - private Path basePath; + @VisibleForTesting + protected String source; + @VisibleForTesting + protected boolean ignoreError; + @VisibleForTesting + protected boolean allowAppend; + @VisibleForTesting + protected Path basePath; private FileSystem fileSystem; // The current directory path into which we're writing files private Path currentDirPath; @@ -149,11 +185,21 @@ public class RollingFileSystemSink implements MetricsSink, Closeable { // We keep this only to be able to call hsynch() on it. private FSDataOutputStream currentFSOutStream; private Timer flushTimer; - - // This flag is used during testing to make the flusher thread run after only - // a short pause instead of waiting for the top of the hour. + // The amount of time between rolls @VisibleForTesting - protected static boolean flushQuickly = false; + protected long rollIntervalMillis; + // The maximum amount of random time to add to the initial roll + @VisibleForTesting + protected long rollOffsetIntervalMillis; + // The time for the nextFlush + @VisibleForTesting + protected Calendar nextFlush = null; + // This flag when true causes a metrics write to schedule a flush thread to + // run immediately, but only if a flush thread is already scheduled. (It's a + // timing thing. If the first write forces the flush, it will strand the + // second write.) + @VisibleForTesting + protected static boolean forceFlush = false; // This flag is used by the flusher thread to indicate that it has run. Used // only for testing purposes. @VisibleForTesting @@ -165,13 +211,36 @@ public class RollingFileSystemSink implements MetricsSink, Closeable { @VisibleForTesting protected static FileSystem suppliedFilesystem = null; + /** + * Create an empty instance. Required for reflection. + */ + public RollingFileSystemSink() { + } + + /** + * Create an instance for testing. + * + * @param flushIntervalMillis the roll interval in millis + * @param flushOffsetIntervalMillis the roll offset interval in millis + */ + @VisibleForTesting + protected RollingFileSystemSink(long flushIntervalMillis, + long flushOffsetIntervalMillis) { + this.rollIntervalMillis = flushIntervalMillis; + this.rollOffsetIntervalMillis = flushOffsetIntervalMillis; + } + @Override public void init(SubsetConfiguration metrics2Properties) { properties = metrics2Properties; basePath = new Path(properties.getString(BASEPATH_KEY, BASEPATH_DEFAULT)); source = properties.getString(SOURCE_KEY, SOURCE_DEFAULT); - ignoreError = properties.getBoolean(IGNORE_ERROR_KEY, false); - allowAppend = properties.getBoolean(ALLOW_APPEND_KEY, false); + ignoreError = properties.getBoolean(IGNORE_ERROR_KEY, DEFAULT_IGNORE_ERROR); + allowAppend = properties.getBoolean(ALLOW_APPEND_KEY, DEFAULT_ALLOW_APPEND); + rollOffsetIntervalMillis = + getNonNegative(ROLL_OFFSET_INTERVAL_MILLIS_KEY, + DEFAULT_ROLL_OFFSET_INTERVAL_MILLIS); + rollIntervalMillis = getRollInterval(); conf = loadConf(); UserGroupInformation.setConfiguration(conf); @@ -179,8 +248,8 @@ public void init(SubsetConfiguration metrics2Properties) { // Don't do secure setup if it's not needed. if (UserGroupInformation.isSecurityEnabled()) { // Validate config so that we don't get an NPE - checkForProperty(properties, KEYTAB_PROPERTY_KEY); - checkForProperty(properties, USERNAME_PROPERTY_KEY); + checkIfPropertyExists(KEYTAB_PROPERTY_KEY); + checkIfPropertyExists(USERNAME_PROPERTY_KEY); try { @@ -228,6 +297,7 @@ private boolean initFs() { } flushTimer = new Timer("RollingFileSystemSink Flusher", true); + setInitialFlushTime(new Date()); } return success; @@ -238,8 +308,6 @@ private boolean initFs() { * strings, allowing for either the property or the configuration not to be * set. * - * @param properties the sink properties - * @param conf the conf * @param property the property to stringify * @return the stringified property */ @@ -264,15 +332,98 @@ private String stringifySecurityProperty(String property) { return securityProperty; } + /** + * Extract the roll interval from the configuration and return it in + * milliseconds. + * + * @return the roll interval in millis + */ + @VisibleForTesting + protected long getRollInterval() { + String rollInterval = + properties.getString(ROLL_INTERVAL_KEY, DEFAULT_ROLL_INTERVAL); + Pattern pattern = Pattern.compile("^\\s*(\\d+)\\s*([A-Za-z]*)\\s*$"); + Matcher match = pattern.matcher(rollInterval); + long millis; + + if (match.matches()) { + String flushUnit = match.group(2); + int rollIntervalInt; + + try { + rollIntervalInt = Integer.parseInt(match.group(1)); + } catch (NumberFormatException ex) { + throw new MetricsException("Unrecognized flush interval: " + + rollInterval + ". Must be a number followed by an optional " + + "unit. The unit must be one of: minute, hour, day", ex); + } + + if ("".equals(flushUnit)) { + millis = TimeUnit.HOURS.toMillis(rollIntervalInt); + } else { + switch (flushUnit.toLowerCase()) { + case "m": + case "min": + case "minute": + case "minutes": + millis = TimeUnit.MINUTES.toMillis(rollIntervalInt); + break; + case "h": + case "hr": + case "hour": + case "hours": + millis = TimeUnit.HOURS.toMillis(rollIntervalInt); + break; + case "d": + case "day": + case "days": + millis = TimeUnit.DAYS.toMillis(rollIntervalInt); + break; + default: + throw new MetricsException("Unrecognized unit for flush interval: " + + flushUnit + ". Must be one of: minute, hour, day"); + } + } + } else { + throw new MetricsException("Unrecognized flush interval: " + + rollInterval + ". Must be a number followed by an optional unit." + + " The unit must be one of: minute, hour, day"); + } + + if (millis < 60000) { + throw new MetricsException("The flush interval property must be " + + "at least 1 minute. Value was " + rollInterval); + } + + return millis; + } + + /** + * Return the property value if it's non-negative and throw an exception if + * it's not. + * + * @param key the property key + * @param defaultValue the default value + */ + private long getNonNegative(String key, int defaultValue) { + int flushOffsetIntervalMillis = properties.getInt(key, defaultValue); + + if (flushOffsetIntervalMillis < 0) { + throw new MetricsException("The " + key + " property must be " + + "non-negative. Value was " + flushOffsetIntervalMillis); + } + + return flushOffsetIntervalMillis; + } + /** * Throw a {@link MetricsException} if the given property is not set. * - * @param conf the configuration to test * @param key the key to validate */ - private static void checkForProperty(SubsetConfiguration conf, String key) { - if (!conf.containsKey(key)) { - throw new MetricsException("Configuration is missing " + key + private void checkIfPropertyExists(String key) { + if (!properties.containsKey(key)) { + throw new MetricsException("Metrics2 configuration is missing " + key + " property"); } } @@ -301,7 +452,6 @@ private Configuration loadConf() { * Return the supplied file system for testing or otherwise get a new file * system. * - * @param conf the configuration * @return the file system to use * @throws MetricsException thrown if the file system could not be retrieved */ @@ -327,6 +477,7 @@ private FileSystem getFileSystem() throws MetricsException { /** * Test whether the file system supports append and return the answer. + * * @param fs the target file system */ private boolean checkAppend(FileSystem fs) { @@ -351,14 +502,14 @@ private boolean checkAppend(FileSystem fs) { * new directory or new log file */ private void rollLogDirIfNeeded() throws MetricsException { + // Because we're working relative to the clock, we use a Date instead + // of Time.monotonicNow(). Date now = new Date(); - String currentDir = DATE_FORMAT.format(now); - Path path = new Path(basePath, currentDir); // We check whether currentOutStream is null instead of currentDirPath, // because if currentDirPath is null, then currentOutStream is null, but - // currentOutStream can be null for other reasons. - if ((currentOutStream == null) || !path.equals(currentDirPath)) { + // currentOutStream can be null for other reasons. Same for nextFlush. + if ((currentOutStream == null) || now.after(nextFlush.getTime())) { // If we're not yet connected to HDFS, create the connection if (!initialized) { initialized = initFs(); @@ -372,7 +523,7 @@ private void rollLogDirIfNeeded() throws MetricsException { currentOutStream.close(); } - currentDirPath = path; + currentDirPath = findCurrentDirectory(now); try { rollLogDir(); @@ -380,34 +531,41 @@ private void rollLogDirIfNeeded() throws MetricsException { throwMetricsException("Failed to create new log file", ex); } - scheduleFlush(now); + // Update the time of the next flush + updateFlushTime(now); + // Schedule the next flush at that time + scheduleFlush(nextFlush.getTime()); } + } else if (forceFlush) { + scheduleFlush(new Date()); } } /** - * Schedule the current hour's directory to be flushed at the top of the next - * hour. If this ends up running after the top of the next hour, it will - * execute immediately. + * Use the given time to determine the current directory. The current + * directory will be based on the {@link #rollIntervalMinutes}. * * @param now the current time + * @return the current directory */ - private void scheduleFlush(Date now) { + private Path findCurrentDirectory(Date now) { + long offset = ((now.getTime() - nextFlush.getTimeInMillis()) + / rollIntervalMillis) * rollIntervalMillis; + String currentDir = + DATE_FORMAT.format(new Date(nextFlush.getTimeInMillis() + offset)); + + return new Path(basePath, currentDir); + } + + /** + * Schedule the current interval's directory to be flushed. If this ends up + * running after the top of the next interval, it will execute immediately. + * + * @param when the time the thread should run + */ + private void scheduleFlush(Date when) { // Store the current currentDirPath to close later final PrintStream toClose = currentOutStream; - Calendar next = Calendar.getInstance(); - - next.setTime(now); - - if (flushQuickly) { - // If we're running unit tests, flush after a short pause - next.add(Calendar.MILLISECOND, 400); - } else { - // Otherwise flush at the top of the hour - next.set(Calendar.SECOND, 0); - next.set(Calendar.MINUTE, 0); - next.add(Calendar.HOUR, 1); - } flushTimer.schedule(new TimerTask() { @Override @@ -420,11 +578,81 @@ public void run() { hasFlushed = true; } - }, next.getTime()); + }, when); } /** - * Create a new directory based on the current hour and a new log file in + * Update the {@link #nextFlush} variable to the next flush time. Add + * an integer number of flush intervals, preserving the initial random offset. + * + * @param now the current time + */ + @VisibleForTesting + protected void updateFlushTime(Date now) { + // In non-initial rounds, add an integer number of intervals to the last + // flush until a time in the future is achieved, thus preserving the + // original random offset. + int millis = + (int) (((now.getTime() - nextFlush.getTimeInMillis()) + / rollIntervalMillis + 1) * rollIntervalMillis); + + nextFlush.add(Calendar.MILLISECOND, millis); + } + + /** + * Set the {@link #nextFlush} variable to the initial flush time. The initial + * flush will be an integer number of flush intervals past the beginning of + * the current hour and will have a random offset added, up to + * {@link #rollOffsetIntervalMillis}. The initial flush will be a time in + * past that can be used from which to calculate future flush times. + * + * @param now the current time + */ + @VisibleForTesting + protected void setInitialFlushTime(Date now) { + // Start with the beginning of the current hour + nextFlush = Calendar.getInstance(); + nextFlush.setTime(now); + nextFlush.set(Calendar.MILLISECOND, 0); + nextFlush.set(Calendar.SECOND, 0); + nextFlush.set(Calendar.MINUTE, 0); + + // In the first round, calculate the first flush as the largest number of + // intervals from the beginning of the current hour that's not in the + // future by: + // 1. Subtract the beginning of the hour from the current time + // 2. Divide by the roll interval and round down to get the number of whole + // intervals that have passed since the beginning of the hour + // 3. Multiply by the roll interval to get the number of millis between + // the beginning of the current hour and the beginning of the current + // interval. + int millis = (int) (((now.getTime() - nextFlush.getTimeInMillis()) + / rollIntervalMillis) * rollIntervalMillis); + + // Then add some noise to help prevent all the nodes from + // closing their files at the same time. + if (rollOffsetIntervalMillis > 0) { + millis += ThreadLocalRandom.current().nextLong(rollOffsetIntervalMillis); + + // If the added time puts us into the future, step back one roll interval + // because the code to increment nextFlush to the next flush expects that + // nextFlush is the next flush from the previous interval. There wasn't + // a previous interval, so we just fake it with the time in the past that + // would have been the previous interval if there had been one. + // + // It's OK if millis comes out negative. + while (nextFlush.getTimeInMillis() + millis > now.getTime()) { + millis -= rollIntervalMillis; + } + } + + // Adjust the next flush time by millis to get the time of our ficticious + // previous next flush + nextFlush.add(Calendar.MILLISECOND, millis); + } + + /** + * Create a new directory based on the current interval and a new log file in * that directory. * * @throws IOException thrown if an error occurs while creating the @@ -451,7 +679,8 @@ private void rollLogDir() throws IOException { * path is found. * * Once the file is open, update {@link #currentFSOutStream}, - * {@link #currentOutStream}, and {@#link #currentFile} are set appropriately. + * {@link #currentOutStream}, and {@#link #currentFilePath} are set + * appropriately. * * @param initial the target path * @throws IOException thrown if the call to see if the exists fails @@ -552,7 +781,7 @@ private int extractId(String file) { * instead. * * Once the file is open, update {@link #currentFSOutStream}, - * {@link #currentOutStream}, and {@#link #currentFile} are set appropriately. + * {@link #currentOutStream}, and {@#link #currentFilePath}. * * @param initial the target path * @throws IOException thrown if the call to see the append operation fails. @@ -615,9 +844,9 @@ public void putMetrics(MetricsRecord record) { currentOutStream.println(); // If we don't hflush(), the data may not be written until the file is - // closed. The file won't be closed until the top of the hour *AND* + // closed. The file won't be closed until the end of the interval *AND* // another record is received. Calling hflush() makes sure that the data - // is complete at the top of the hour. + // is complete at the end of the interval. try { currentFSOutStream.hflush(); } catch (IOException ex) { @@ -668,8 +897,8 @@ public void close() { * as the new exception's message with the current file name * ({@link #currentFilePath}) appended to it. * - * @param message the exception message. The message will have the current - * file name ({@link #currentFilePath}) appended to it. + * @param message the exception message. The message will have a colon and + * the current file name ({@link #currentFilePath}) appended to it. * @throws MetricsException thrown if there was an error and the sink isn't * ignoring errors */ @@ -687,9 +916,9 @@ private void checkForErrors(String message) * ({@link #currentFilePath}) and the Throwable's string representation * appended to it. * - * @param message the exception message. The message will have the current - * file name ({@link #currentFilePath}) and the Throwable's string - * representation appended to it. + * @param message the exception message. The message will have a colon, the + * current file name ({@link #currentFilePath}), and the Throwable's string + * representation (wrapped in square brackets) appended to it. * @param t the Throwable to wrap */ private void throwMetricsException(String message, Throwable t) { @@ -705,8 +934,8 @@ private void throwMetricsException(String message, Throwable t) { * new exception's message with the current file name * ({@link #currentFilePath}) appended to it. * - * @param message the exception message. The message will have the current - * file name ({@link #currentFilePath}) appended to it. + * @param message the exception message. The message will have a colon and + * the current file name ({@link #currentFilePath}) appended to it. */ private void throwMetricsException(String message) { if (!ignoreError) { diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSinkTestBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSinkTestBase.java index b65a759686..4ac10f2763 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSinkTestBase.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSinkTestBase.java @@ -180,10 +180,12 @@ protected MetricsSystem initMetricsSystem(String path, boolean ignoreErrors, .add(prefix + ".sink.mysink0.source", "testsrc") .add(prefix + ".sink.mysink0.context", "test1") .add(prefix + ".sink.mysink0.ignore-error", ignoreErrors) - .add(prefix + ".sink.mysink0.allow-append", allowAppend); + .add(prefix + ".sink.mysink0.allow-append", allowAppend) + .add(prefix + ".sink.mysink0.roll-offset-interval-millis", 0) + .add(prefix + ".sink.mysink0.roll-interval", "1h"); if (useSecureParams) { - builder.add(prefix + ".sink.mysink0.keytab-key", SINK_KEYTAB_FILE_KEY) + builder.add(prefix + ".sink.mysink0.keytab-key", SINK_KEYTAB_FILE_KEY) .add(prefix + ".sink.mysink0.principal-key", SINK_PRINCIPAL_KEY); } @@ -210,7 +212,7 @@ protected MetricsSystem initMetricsSystem(String path, boolean ignoreErrors, */ protected String doWriteTest(MetricsSystem ms, String path, int count) throws IOException, URISyntaxException { - final String then = DATE_FORMAT.format(new Date()); + final String then = DATE_FORMAT.format(new Date()) + "00"; MyMetrics1 mm1 = new MyMetrics1().registerWith(ms); new MyMetrics2().registerWith(ms); @@ -239,7 +241,7 @@ protected String doWriteTest(MetricsSystem ms, String path, int count) */ protected String readLogFile(String path, String then, int count) throws IOException, URISyntaxException { - final String now = DATE_FORMAT.format(new Date()); + final String now = DATE_FORMAT.format(new Date()) + "00"; final String logFile = getLogFilename(); FileSystem fs = FileSystem.get(new URI(path), new Configuration()); StringBuilder metrics = new StringBuilder(); @@ -426,7 +428,7 @@ protected void preCreateLogFile(String path, int numFiles) Calendar now = getNowNotTopOfHour(); FileSystem fs = FileSystem.get(new URI(path), new Configuration()); - Path dir = new Path(path, DATE_FORMAT.format(now.getTime())); + Path dir = new Path(path, DATE_FORMAT.format(now.getTime()) + "00"); fs.mkdirs(dir); @@ -494,8 +496,8 @@ public void assertFileCount(FileSystem fs, Path dir, int expected) } assertTrue("The sink created additional unexpected log files. " + count - + "files were created", expected >= count); - assertTrue("The sink created too few log files. " + count + "files were " + + " files were created", expected >= count); + assertTrue("The sink created too few log files. " + count + " files were " + "created", expected <= count); } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSink.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSink.java index 3c6cd27b59..9c34dbad2f 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSink.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSink.java @@ -18,141 +18,244 @@ package org.apache.hadoop.metrics2.sink; -import org.apache.hadoop.metrics2.MetricsSystem; +import java.util.Calendar; +import org.apache.commons.configuration.SubsetConfiguration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.metrics2.MetricsException; +import org.apache.hadoop.metrics2.impl.ConfigBuilder; import org.junit.Test; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; /** - * Test the {@link RollingFileSystemSink} class in the context of the local file - * system. + * Test that the init() method picks up all the configuration settings + * correctly. */ -public class TestRollingFileSystemSink extends RollingFileSystemSinkTestBase { - /** - * Test writing logs to the local file system. - * @throws Exception when things break - */ +public class TestRollingFileSystemSink { @Test - public void testWrite() throws Exception { - String path = methodDir.getAbsolutePath(); - MetricsSystem ms = initMetricsSystem(path, false, false); + public void testInit() { + ConfigBuilder builder = new ConfigBuilder(); + SubsetConfiguration conf = + builder.add("sink.roll-interval", "10m") + .add("sink.roll-offset-interval-millis", "1") + .add("sink.basepath", "path") + .add("sink.ignore-error", "true") + .add("sink.allow-append", "true") + .add("sink.source", "src") + .subset("sink"); - assertMetricsContents(doWriteTest(ms, path, 1)); + RollingFileSystemSink sink = new RollingFileSystemSink(); + + sink.init(conf); + + assertEquals("The roll interval was not set correctly", + sink.rollIntervalMillis, 600000); + assertEquals("The roll offset interval was not set correctly", + sink.rollOffsetIntervalMillis, 1); + assertEquals("The base path was not set correctly", + sink.basePath, new Path("path")); + assertEquals("ignore-error was not set correctly", + sink.ignoreError, true); + assertEquals("allow-append was not set correctly", + sink.allowAppend, true); + assertEquals("The source was not set correctly", + sink.source, "src"); } /** - * Test writing logs to the local file system with the sink set to ignore - * errors. - * @throws Exception when things break + * Test whether the initial roll interval is set correctly. */ @Test - public void testSilentWrite() throws Exception { - String path = methodDir.getAbsolutePath(); - MetricsSystem ms = initMetricsSystem(path, true, false); + public void testSetInitialFlushTime() { + RollingFileSystemSink rfsSink = new RollingFileSystemSink(1000, 0); + Calendar calendar = Calendar.getInstance(); - assertMetricsContents(doWriteTest(ms, path, 1)); + calendar.set(Calendar.MILLISECOND, 0); + calendar.set(Calendar.SECOND, 0); + calendar.set(Calendar.MINUTE, 0); + calendar.set(Calendar.HOUR, 0); + calendar.set(Calendar.DAY_OF_YEAR, 1); + calendar.set(Calendar.YEAR, 2016); + + assertNull("Last flush time should have been null prior to calling init()", + rfsSink.nextFlush); + + rfsSink.setInitialFlushTime(calendar.getTime()); + + long diff = + rfsSink.nextFlush.getTimeInMillis() - calendar.getTimeInMillis(); + + assertEquals("The initial flush time was calculated incorrectly", 0L, diff); + + calendar.set(Calendar.MILLISECOND, 10); + rfsSink.setInitialFlushTime(calendar.getTime()); + diff = rfsSink.nextFlush.getTimeInMillis() - calendar.getTimeInMillis(); + + assertEquals("The initial flush time was calculated incorrectly", + -10L, diff); + + calendar.set(Calendar.SECOND, 1); + calendar.set(Calendar.MILLISECOND, 10); + rfsSink.setInitialFlushTime(calendar.getTime()); + diff = rfsSink.nextFlush.getTimeInMillis() - calendar.getTimeInMillis(); + + assertEquals("The initial flush time was calculated incorrectly", + -10L, diff); + + // Try again with a random offset + rfsSink = new RollingFileSystemSink(1000, 100); + + assertNull("Last flush time should have been null prior to calling init()", + rfsSink.nextFlush); + + calendar.set(Calendar.MILLISECOND, 0); + calendar.set(Calendar.SECOND, 0); + rfsSink.setInitialFlushTime(calendar.getTime()); + + diff = rfsSink.nextFlush.getTimeInMillis() - calendar.getTimeInMillis(); + + assertTrue("The initial flush time was calculated incorrectly: " + diff, + (diff >= -1000L) && (diff < -900L)); + + calendar.set(Calendar.MILLISECOND, 10); + rfsSink.setInitialFlushTime(calendar.getTime()); + diff = rfsSink.nextFlush.getTimeInMillis() - calendar.getTimeInMillis(); + + assertTrue("The initial flush time was calculated incorrectly: " + diff, + ((diff >= -10L) && (diff <= 0L) || + ((diff > -1000L) && (diff < -910L)))); + + calendar.set(Calendar.SECOND, 1); + calendar.set(Calendar.MILLISECOND, 10); + rfsSink.setInitialFlushTime(calendar.getTime()); + diff = rfsSink.nextFlush.getTimeInMillis() - calendar.getTimeInMillis(); + + assertTrue("The initial flush time was calculated incorrectly: " + diff, + ((diff >= -10L) && (diff <= 0L) || + ((diff > -1000L) && (diff < -910L)))); + + // Now try pathological settings + rfsSink = new RollingFileSystemSink(1000, 1000000); + + assertNull("Last flush time should have been null prior to calling init()", + rfsSink.nextFlush); + + calendar.set(Calendar.MILLISECOND, 1); + calendar.set(Calendar.SECOND, 0); + rfsSink.setInitialFlushTime(calendar.getTime()); + + diff = rfsSink.nextFlush.getTimeInMillis() - calendar.getTimeInMillis(); + + assertTrue("The initial flush time was calculated incorrectly: " + diff, + (diff > -1000L) && (diff <= 0L)); } /** - * Test writing logs to HDFS when the log file already exists. - * - * @throws Exception when things break + * Test that the roll time updates correctly. */ @Test - public void testExistingWrite() throws Exception { - String path = methodDir.getAbsolutePath(); + public void testUpdateRollTime() { + RollingFileSystemSink rfsSink = new RollingFileSystemSink(1000, 0); + Calendar calendar = Calendar.getInstance(); - assertMetricsContents(doAppendTest(path, false, false, 2)); + calendar.set(Calendar.MILLISECOND, 0); + calendar.set(Calendar.SECOND, 0); + calendar.set(Calendar.MINUTE, 0); + calendar.set(Calendar.HOUR, 0); + calendar.set(Calendar.DAY_OF_YEAR, 1); + calendar.set(Calendar.YEAR, 2016); + + rfsSink.nextFlush = Calendar.getInstance(); + rfsSink.nextFlush.setTime(calendar.getTime()); + rfsSink.updateFlushTime(calendar.getTime()); + + assertEquals("The next roll time should have been 1 second in the future", + calendar.getTimeInMillis() + 1000, + rfsSink.nextFlush.getTimeInMillis()); + + rfsSink.nextFlush.setTime(calendar.getTime()); + calendar.add(Calendar.MILLISECOND, 10); + rfsSink.updateFlushTime(calendar.getTime()); + + assertEquals("The next roll time should have been 990 ms in the future", + calendar.getTimeInMillis() + 990, + rfsSink.nextFlush.getTimeInMillis()); + + rfsSink.nextFlush.setTime(calendar.getTime()); + calendar.add(Calendar.SECOND, 2); + calendar.add(Calendar.MILLISECOND, 10); + rfsSink.updateFlushTime(calendar.getTime()); + + assertEquals("The next roll time should have been 990 ms in the future", + calendar.getTimeInMillis() + 990, + rfsSink.nextFlush.getTimeInMillis()); } /** - * Test writing logs to HDFS when the log file and the .1 log file already - * exist. - * - * @throws Exception when things break + * Test whether the roll interval is correctly calculated from the + * configuration settings. */ @Test - public void testExistingWrite2() throws Exception { - String path = methodDir.getAbsolutePath(); - MetricsSystem ms = initMetricsSystem(path, false, false); + public void testGetRollInterval() { + doTestGetRollInterval(1, new String[] {"m", "min", "minute", "minutes"}, + 60 * 1000L); + doTestGetRollInterval(1, new String[] {"h", "hr", "hour", "hours"}, + 60 * 60 * 1000L); + doTestGetRollInterval(1, new String[] {"d", "day", "days"}, + 24 * 60 * 60 * 1000L); - preCreateLogFile(path, 2); + ConfigBuilder builder = new ConfigBuilder(); + SubsetConfiguration conf = + builder.add("sink.roll-interval", "1").subset("sink"); + // We can reuse the same sink evry time because we're setting the same + // property every time. + RollingFileSystemSink sink = new RollingFileSystemSink(); - assertMetricsContents(doWriteTest(ms, path, 3)); - } + sink.init(conf); - /** - * Test writing logs to HDFS with ignore errors enabled when - * the log file already exists. - * - * @throws Exception when things break - */ - @Test - public void testSilentExistingWrite() throws Exception { - String path = methodDir.getAbsolutePath(); + assertEquals(3600000L, sink.getRollInterval()); - assertMetricsContents(doAppendTest(path, false, false, 2)); - } + for (char c : "abcefgijklnopqrtuvwxyz".toCharArray()) { + builder = new ConfigBuilder(); + conf = builder.add("sink.roll-interval", "90 " + c).subset("sink"); - /** - * Test that writing fails when the directory isn't writable. - */ - @Test - public void testFailedWrite() { - String path = methodDir.getAbsolutePath(); - MetricsSystem ms = initMetricsSystem(path, false, false); - - new MyMetrics1().registerWith(ms); - - methodDir.setWritable(false); - MockSink.errored = false; - - try { - // publish the metrics - ms.publishMetricsNow(); - - assertTrue("No exception was generated while writing metrics " - + "even though the target directory was not writable", - MockSink.errored); - - ms.stop(); - ms.shutdown(); - } finally { - // Make sure the dir is writable again so we can delete it at the end - methodDir.setWritable(true); + try { + sink.init(conf); + sink.getRollInterval(); + fail("Allowed flush interval with bad units: " + c); + } catch (MetricsException ex) { + // Expected + } } } /** - * Test that writing fails silently when the directory is not writable. + * Test the basic unit conversions with the given unit name modifier applied. + * + * @param mod a unit name modifier */ - @Test - public void testSilentFailedWrite() { - String path = methodDir.getAbsolutePath(); - MetricsSystem ms = initMetricsSystem(path, true, false); + private void doTestGetRollInterval(int num, String[] units, long expected) { + RollingFileSystemSink sink = new RollingFileSystemSink(); + ConfigBuilder builder = new ConfigBuilder(); - new MyMetrics1().registerWith(ms); + for (String unit : units) { + sink.init(builder.add("sink.roll-interval", num + unit).subset("sink")); + assertEquals(expected, sink.getRollInterval()); - methodDir.setWritable(false); - MockSink.errored = false; + sink.init(builder.add("sink.roll-interval", + num + unit.toUpperCase()).subset("sink")); + assertEquals(expected, sink.getRollInterval()); - try { - // publish the metrics - ms.publishMetricsNow(); + sink.init(builder.add("sink.roll-interval", + num + " " + unit).subset("sink")); + assertEquals(expected, sink.getRollInterval()); - assertFalse("An exception was generated while writing metrics " - + "when the target directory was not writable, even though the " - + "sink is set to ignore errors", - MockSink.errored); - - ms.stop(); - ms.shutdown(); - } finally { - // Make sure the dir is writable again so we can delete it at the end - methodDir.setWritable(true); + sink.init(builder.add("sink.roll-interval", + num + " " + unit.toUpperCase()).subset("sink")); + assertEquals(expected, sink.getRollInterval()); } } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSinkWithLocal.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSinkWithLocal.java new file mode 100644 index 0000000000..96306bf7af --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSinkWithLocal.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.metrics2.sink; + +import org.apache.hadoop.metrics2.MetricsSystem; + +import org.junit.Test; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * Test the {@link RollingFileSystemSink} class in the context of the local file + * system. + */ +public class TestRollingFileSystemSinkWithLocal + extends RollingFileSystemSinkTestBase { + /** + * Test writing logs to the local file system. + * @throws Exception when things break + */ + @Test + public void testWrite() throws Exception { + String path = methodDir.getAbsolutePath(); + MetricsSystem ms = initMetricsSystem(path, false, false); + + assertMetricsContents(doWriteTest(ms, path, 1)); + } + + /** + * Test writing logs to the local file system with the sink set to ignore + * errors. + * @throws Exception when things break + */ + @Test + public void testSilentWrite() throws Exception { + String path = methodDir.getAbsolutePath(); + MetricsSystem ms = initMetricsSystem(path, true, false); + + assertMetricsContents(doWriteTest(ms, path, 1)); + } + + /** + * Test writing logs to HDFS when the log file already exists. + * + * @throws Exception when things break + */ + @Test + public void testExistingWrite() throws Exception { + String path = methodDir.getAbsolutePath(); + + assertMetricsContents(doAppendTest(path, false, false, 2)); + } + + /** + * Test writing logs to HDFS when the log file and the .1 log file already + * exist. + * + * @throws Exception when things break + */ + @Test + public void testExistingWrite2() throws Exception { + String path = methodDir.getAbsolutePath(); + MetricsSystem ms = initMetricsSystem(path, false, false); + + preCreateLogFile(path, 2); + + assertMetricsContents(doWriteTest(ms, path, 3)); + } + + /** + * Test writing logs to HDFS with ignore errors enabled when + * the log file already exists. + * + * @throws Exception when things break + */ + @Test + public void testSilentExistingWrite() throws Exception { + String path = methodDir.getAbsolutePath(); + + assertMetricsContents(doAppendTest(path, false, false, 2)); + } + + /** + * Test that writing fails when the directory isn't writable. + */ + @Test + public void testFailedWrite() { + String path = methodDir.getAbsolutePath(); + MetricsSystem ms = initMetricsSystem(path, false, false); + + new MyMetrics1().registerWith(ms); + + methodDir.setWritable(false); + MockSink.errored = false; + + try { + // publish the metrics + ms.publishMetricsNow(); + + assertTrue("No exception was generated while writing metrics " + + "even though the target directory was not writable", + MockSink.errored); + + ms.stop(); + ms.shutdown(); + } finally { + // Make sure the dir is writable again so we can delete it at the end + methodDir.setWritable(true); + } + } + + /** + * Test that writing fails silently when the directory is not writable. + */ + @Test + public void testSilentFailedWrite() { + String path = methodDir.getAbsolutePath(); + MetricsSystem ms = initMetricsSystem(path, true, false); + + new MyMetrics1().registerWith(ms); + + methodDir.setWritable(false); + MockSink.errored = false; + + try { + // publish the metrics + ms.publishMetricsNow(); + + assertFalse("An exception was generated while writing metrics " + + "when the target directory was not writable, even though the " + + "sink is set to ignore errors", + MockSink.errored); + + ms.stop(); + ms.shutdown(); + } finally { + // Make sure the dir is writable again so we can delete it at the end + methodDir.setWritable(true); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSinkWithHdfs.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSinkWithHdfs.java index 9984b34fbf..fc25ded180 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSinkWithHdfs.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSinkWithHdfs.java @@ -20,8 +20,6 @@ import java.io.IOException; import java.net.URI; -import org.junit.After; -import org.junit.Before; import java.util.Calendar; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -30,9 +28,12 @@ import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.metrics2.MetricsSystem; import org.apache.hadoop.metrics2.sink.RollingFileSystemSinkTestBase.MyMetrics1; +import org.junit.After; +import org.junit.Before; import org.junit.Test; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; /** * Test the {@link RollingFileSystemSink} class in the context of HDFS. @@ -58,7 +59,6 @@ public void setupHdfs() throws IOException { new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build(); // Also clear sink flags - RollingFileSystemSink.flushQuickly = false; RollingFileSystemSink.hasFlushed = false; } @@ -251,10 +251,12 @@ public void testSilentFailedClose() throws IOException { */ @Test public void testFlushThread() throws Exception { - RollingFileSystemSink.flushQuickly = true; + // Cause the sink's flush thread to be run immediately after the second + // metrics log is written + RollingFileSystemSink.forceFlush = true; String path = "hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp"; - MetricsSystem ms = initMetricsSystem(path, true, false); + MetricsSystem ms = initMetricsSystem(path, true, false, false); new MyMetrics1().registerWith(ms); @@ -264,14 +266,21 @@ public void testFlushThread() throws Exception { // regardless. ms.publishMetricsNow(); - // Sleep until the flusher has run + int count = 0; + + // Sleep until the flusher has run. This should never actually need to + // sleep, but the sleep is here to make sure this test isn't flakey. while (!RollingFileSystemSink.hasFlushed) { - Thread.sleep(50L); + Thread.sleep(10L); + + if (++count > 1000) { + fail("Flush thread did not run within 10 seconds"); + } } - Calendar now = getNowNotTopOfHour(); + Calendar now = Calendar.getInstance(); + Path currentDir = new Path(path, DATE_FORMAT.format(now.getTime()) + "00"); FileSystem fs = FileSystem.newInstance(new URI(path), new Configuration()); - Path currentDir = new Path(path, DATE_FORMAT.format(now.getTime())); Path currentFile = findMostRecentLogFile(fs, new Path(currentDir, getLogFilename())); FileStatus status = fs.getFileStatus(currentFile); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSinkWithSecureHdfs.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSinkWithSecureHdfs.java index dce4fdcf22..906950b46e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSinkWithSecureHdfs.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSinkWithSecureHdfs.java @@ -49,8 +49,11 @@ import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.ssl.KeyStoreTestUtil; +import org.junit.After; +import org.junit.AfterClass; import org.junit.Test; -import static org.junit.Assert.assertTrue; +import org.junit.Before; +import org.junit.BeforeClass; import static org.junit.Assert.assertTrue; /** @@ -66,6 +69,75 @@ public class TestRollingFileSystemSinkWithSecureHdfs private static String hdfsPrincipal; private static String hdfsKeytab; private static String spnegoPrincipal; + private MiniDFSCluster cluster = null; + private UserGroupInformation sink = null; + + /** + * Setup the KDC for testing a secure HDFS cluster. + * + * @throws Exception thrown if the KDC setup fails + */ + @BeforeClass + public static void initKdc() throws Exception { + Properties kdcConf = MiniKdc.createConf(); + kdc = new MiniKdc(kdcConf, ROOT_TEST_DIR); + kdc.start(); + + File sinkKeytabFile = new File(ROOT_TEST_DIR, "sink.keytab"); + sinkKeytab = sinkKeytabFile.getAbsolutePath(); + kdc.createPrincipal(sinkKeytabFile, "sink/localhost"); + sinkPrincipal = "sink/localhost@" + kdc.getRealm(); + + File hdfsKeytabFile = new File(ROOT_TEST_DIR, "hdfs.keytab"); + hdfsKeytab = hdfsKeytabFile.getAbsolutePath(); + kdc.createPrincipal(hdfsKeytabFile, "hdfs/localhost", + "HTTP/localhost"); + hdfsPrincipal = "hdfs/localhost@" + kdc.getRealm(); + spnegoPrincipal = "HTTP/localhost@" + kdc.getRealm(); + } + + /** + * Setup the mini-DFS cluster. + * + * @throws Exception thrown if the cluster setup fails + */ + @Before + public void initCluster() throws Exception { + HdfsConfiguration conf = createSecureConfig("authentication,privacy"); + + RollingFileSystemSink.hasFlushed = false; + RollingFileSystemSink.suppliedConf = conf; + + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES) + .build(); + cluster.waitActive(); + createDirectoriesSecurely(); + } + + /** + * Stop the mini-DFS cluster. + */ + @After + public void stopCluster() { + if (cluster != null) { + cluster.shutdown(); + } + + // Restore non-secure conf + UserGroupInformation.setConfiguration(new Configuration()); + RollingFileSystemSink.suppliedConf = null; + RollingFileSystemSink.suppliedFilesystem = null; + } + + /** + * Stop the mini-KDC. + */ + @AfterClass + public static void shutdownKdc() { + if (kdc != null) { + kdc.stop(); + } + } /** * Do a basic write test against an HDFS cluster with Kerberos enabled. We @@ -76,46 +148,18 @@ public class TestRollingFileSystemSinkWithSecureHdfs */ @Test public void testWithSecureHDFS() throws Exception { - RollingFileSystemSink.flushQuickly = false; - RollingFileSystemSink.hasFlushed = false; - initKdc(); + final String path = + "hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp/test"; + final MetricsSystem ms = + initMetricsSystem(path, true, false, true); - MiniDFSCluster cluster = null; - - try { - HdfsConfiguration conf = createSecureConfig("authentication,privacy"); - - RollingFileSystemSink.suppliedConf = conf; - - cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES) - .build(); - - cluster.waitActive(); - - UserGroupInformation sink = createDirectoriesSecurely(cluster); - final String path = - "hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp/test"; - final MetricsSystem ms = initMetricsSystem(path, true, false, true); - - assertMetricsContents( - sink.doAs(new PrivilegedExceptionAction