HADOOP-12759. RollingFileSystemSink should eagerly rotate directories. Contributed by Daniel Templeton.

This commit is contained in:
Andrew Wang 2016-02-06 20:52:35 -08:00
parent 1495ff979a
commit 5b59a0ea85
3 changed files with 227 additions and 84 deletions

View File

@ -1102,6 +1102,9 @@ Release 2.8.0 - UNRELEASED
HADOOP-12292. Make use of DeleteObjects optional.
(Thomas Demoor via stevel)
HADOOP-12759. RollingFileSystemSink should eagerly rotate directories.
(Daniel Templeton via wang)
OPTIMIZATIONS
HADOOP-11785. Reduce the number of listStatus operation in distcp

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.metrics2.sink;
import com.google.common.annotations.VisibleForTesting;
import java.io.Closeable;
import java.io.IOException;
import java.io.PrintStream;
@ -25,8 +26,11 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.Calendar;
import java.util.Date;
import java.util.TimeZone;
import java.util.Timer;
import java.util.TimerTask;
import org.apache.commons.configuration.SubsetConfiguration;
import org.apache.commons.lang.time.FastDateFormat;
@ -35,7 +39,9 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.metrics2.AbstractMetric;
import org.apache.hadoop.metrics2.MetricsException;
import org.apache.hadoop.metrics2.MetricsRecord;
@ -43,7 +49,7 @@
import org.apache.hadoop.metrics2.MetricsTag;
/**
* This class is a metrics sink that uses
* <p>This class is a metrics sink that uses
* {@link org.apache.hadoop.fs.FileSystem} to write the metrics logs. Every
* hour a new directory will be created under the path specified by the
* <code>basepath</code> property. All metrics will be logged to a file in the
@ -54,51 +60,53 @@
* time zone used to create the current hour's directory name is GMT. If the
* <code>basepath</code> property isn't specified, it will default to
* &quot;/tmp&quot;, which is the temp directory on whatever default file
* system is configured for the cluster.
* system is configured for the cluster.</p>
*
* The <code>&lt;prefix&gt;.sink.&lt;instance&gt;.ignore-error</code> property
* controls whether an exception is thrown when an error is encountered writing
* a log file. The default value is <code>true</code>. When set to
* <code>false</code>, file errors are quietly swallowed.
* <p>The <code>&lt;prefix&gt;.sink.&lt;instance&gt;.ignore-error</code>
* property controls whether an exception is thrown when an error is encountered
* writing a log file. The default value is <code>true</code>. When set to
* <code>false</code>, file errors are quietly swallowed.</p>
*
* The primary use of this class is for logging to HDFS. As it uses
* <p>The primary use of this class is for logging to HDFS. As it uses
* {@link org.apache.hadoop.fs.FileSystem} to access the target file system,
* however, it can be used to write to the local file system, Amazon S3, or any
* other supported file system. The base path for the sink will determine the
* file system used. An unqualified path will write to the default file system
* set by the configuration.
* set by the configuration.</p>
*
* Not all file systems support the ability to append to files. In file systems
* without the ability to append to files, only one writer can write to a file
* at a time. To allow for concurrent writes from multiple daemons on a single
* host, the <code>source</code> property should be set to the name of the
* source daemon, e.g. <i>namenode</i>. The value of the <code>source</code>
* property should typically be the same as the property's prefix. If this
* property is not set, the source is taken to be <i>unknown</i>.
* <p>Not all file systems support the ability to append to files. In file
* systems without the ability to append to files, only one writer can write to
* a file at a time. To allow for concurrent writes from multiple daemons on a
* single host, the <code>source</code> property should be set to the name of
* the source daemon, e.g. <i>namenode</i>. The value of the
* <code>source</code> property should typically be the same as the property's
* prefix. If this property is not set, the source is taken to be
* <i>unknown</i>.</p>
*
* Instead of appending to an existing file, by default the sink
* <p>Instead of appending to an existing file, by default the sink
* will create a new file with a suffix of &quot;.&lt;n&gt;&quet;, where
* <i>n</i> is the next lowest integer that isn't already used in a file name,
* similar to the Hadoop daemon logs. NOTE: the file with the <b>highest</b>
* sequence number is the <b>newest</b> file, unlike the Hadoop daemon logs.
* sequence number is the <b>newest</b> file, unlike the Hadoop daemon logs.</p>
*
* For file systems that allow append, the sink supports appending to the
* <p>For file systems that allow append, the sink supports appending to the
* existing file instead. If the <code>allow-append</code> property is set to
* true, the sink will instead append to the existing file on file systems that
* support appends. By default, the <code>allow-append</code> property is
* false.
* false.</p>
*
* Note that when writing to HDFS with <code>allow-append</code> set to true,
* <p>Note that when writing to HDFS with <code>allow-append</code> set to true,
* there is a minimum acceptable number of data nodes. If the number of data
* nodes drops below that minimum, the append will succeed, but reading the
* data will fail with an IOException in the DataStreamer class. The minimum
* number of data nodes required for a successful append is generally 2 or 3.
* number of data nodes required for a successful append is generally 2 or
* 3.</p>
*
* Note also that when writing to HDFS, the file size information is not updated
* until the file is closed (e.g. at the top of the hour) even though the data
* is being written successfully. This is a known HDFS limitation that exists
* because of the performance cost of updating the metadata. See
* <a href="https://issues.apache.org/jira/browse/HDFS-5478">HDFS-5478</a>.
* <p>Note also that when writing to HDFS, the file size information is not
* updated until the file is closed (e.g. at the top of the hour) even though
* the data is being written successfully. This is a known HDFS limitation that
* exists because of the performance cost of updating the metadata. See
* <a href="https://issues.apache.org/jira/browse/HDFS-5478">HDFS-5478</a>.</p>
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
@ -111,6 +119,7 @@ public class RollingFileSystemSink implements MetricsSink, Closeable {
private static final String BASEPATH_DEFAULT = "/tmp";
private static final FastDateFormat DATE_FORMAT =
FastDateFormat.getInstance("yyyyMMddHH", TimeZone.getTimeZone("GMT"));
private final Object lock = new Object();
private String source;
private boolean ignoreError;
private boolean allowAppend;
@ -124,6 +133,11 @@ public class RollingFileSystemSink implements MetricsSink, Closeable {
private PrintStream currentOutStream;
// We keep this only to be able to call hsynch() on it.
private FSDataOutputStream currentFSOutStream;
private Timer flushTimer;
@VisibleForTesting
protected static boolean isTest = false;
@VisibleForTesting
protected static volatile boolean hasFlushed = false;
@Override
public void init(SubsetConfiguration conf) {
@ -147,6 +161,8 @@ public void init(SubsetConfiguration conf) {
if (allowAppend) {
allowAppend = checkAppend(fileSystem);
}
flushTimer = new Timer("RollingFileSystemSink Flusher", true);
}
/**
@ -175,27 +191,71 @@ private boolean checkAppend(FileSystem fs) {
* new directory or new log file
*/
private void rollLogDirIfNeeded() throws MetricsException {
String currentDir = DATE_FORMAT.format(new Date());
Date now = new Date();
String currentDir = DATE_FORMAT.format(now);
Path path = new Path(basePath, currentDir);
// We check whether currentOutStream is null instead of currentDirPath,
// because if currentDirPath is null, then currentOutStream is null, but
// currentOutStream can be null for other reasons.
if ((currentOutStream == null) || !path.equals(currentDirPath)) {
currentDirPath = path;
// Close the stream. This step could have been handled already by the
// flusher thread, but if it has, the PrintStream will just swallow the
// exception, which is fine.
if (currentOutStream != null) {
currentOutStream.close();
}
currentDirPath = path;
try {
rollLogDir();
} catch (IOException ex) {
throwMetricsException("Failed to creating new log file", ex);
throwMetricsException("Failed to create new log file", ex);
}
scheduleFlush(now);
}
}
/**
* Schedule the current hour's directory to be flushed at the top of the next
* hour. If this ends up running after the top of the next hour, it will
* execute immediately.
*
* @param now the current time
*/
private void scheduleFlush(Date now) {
// Store the current currentDirPath to close later
final PrintStream toClose = currentOutStream;
Calendar next = Calendar.getInstance();
next.setTime(now);
if (isTest) {
// If we're running unit tests, flush after a short pause
next.add(Calendar.MILLISECOND, 400);
} else {
// Otherwise flush at the top of the hour
next.set(Calendar.SECOND, 0);
next.set(Calendar.MINUTE, 0);
next.add(Calendar.HOUR, 1);
}
flushTimer.schedule(new TimerTask() {
@Override
public void run() {
synchronized (lock) {
// This close may have already been done by a putMetrics() call. If it
// has, the PrintStream will swallow the exception, which is fine.
toClose.close();
}
hasFlushed = true;
}
}, next.getTime());
}
/**
* Create a new directory based on the current hour and a new log file in
* that directory.
@ -231,7 +291,9 @@ private void rollLogDir() throws IOException {
*/
private void createLogFile(Path initial) throws IOException {
Path currentAttempt = initial;
int id = 1;
// Start at 0 so that if the base filname exists, we start with the suffix
// ".1".
int id = 0;
while (true) {
// First try blindly creating the file. If we fail, it either means
@ -248,8 +310,8 @@ private void createLogFile(Path initial) throws IOException {
} catch (IOException ex) {
// Now we can check to see if the file exists to know why we failed
if (fileSystem.exists(currentAttempt)) {
id = getNextIdToTry(initial, id);
currentAttempt = new Path(initial.toString() + "." + id);
id += 1;
} else {
throw ex;
}
@ -257,6 +319,66 @@ private void createLogFile(Path initial) throws IOException {
}
}
/**
* Return the next ID suffix to use when creating the log file. This method
* will look at the files in the directory, find the one with the highest
* ID suffix, and 1 to that suffix, and return it. This approach saves a full
* linear probe, which matters in the case where there are a large number of
* log files.
*
* @param initial the base file path
* @param lastId the last ID value that was used
* @return the next ID to try
* @throws IOException thrown if there's an issue querying the files in the
* directory
*/
private int getNextIdToTry(Path initial, int lastId)
throws IOException {
RemoteIterator<LocatedFileStatus> files =
fileSystem.listFiles(currentDirPath, true);
String base = initial.toString();
int id = lastId;
while (files.hasNext()) {
String file = files.next().getPath().getName();
if (file.startsWith(base)) {
int fileId = extractId(file);
if (fileId > id) {
id = fileId;
}
}
}
// Return either 1 more than the highest we found or 1 more than the last
// ID used (if no ID was found).
return id + 1;
}
/**
* Extract the ID from the suffix of the given file name.
*
* @param file the file name
* @return the ID or -1 if no ID could be extracted
*/
private int extractId(String file) {
int index = file.lastIndexOf(".");
int id = -1;
// A hostname has to have at least 1 character
if (index > 0) {
try {
id = Integer.parseInt(file.substring(index + 1));
} catch (NumberFormatException ex) {
// This can happen if there's no suffix, but there is a dot in the
// hostname. Just ignore it.
}
}
return id;
}
/**
* Create a new log file and return the {@link FSDataOutputStream}. If a
* file with the specified path already exists, open the file for append
@ -303,65 +425,72 @@ private void createOrAppendLogFile(Path targetFile) throws IOException {
@Override
public void putMetrics(MetricsRecord record) {
rollLogDirIfNeeded();
synchronized (lock) {
rollLogDirIfNeeded();
if (currentOutStream != null) {
currentOutStream.printf("%d %s.%s", record.timestamp(),
record.context(), record.name());
if (currentOutStream != null) {
currentOutStream.printf("%d %s.%s", record.timestamp(),
record.context(), record.name());
String separator = ": ";
String separator = ": ";
for (MetricsTag tag : record.tags()) {
currentOutStream.printf("%s%s=%s", separator, tag.name(), tag.value());
separator = ", ";
for (MetricsTag tag : record.tags()) {
currentOutStream.printf("%s%s=%s", separator, tag.name(),
tag.value());
separator = ", ";
}
for (AbstractMetric metric : record.metrics()) {
currentOutStream.printf("%s%s=%s", separator, metric.name(),
metric.value());
}
currentOutStream.println();
// If we don't hflush(), the data may not be written until the file is
// closed. The file won't be closed until the top of the hour *AND*
// another record is received. Calling hflush() makes sure that the data
// is complete at the top of the hour.
try {
currentFSOutStream.hflush();
} catch (IOException ex) {
throwMetricsException("Failed flushing the stream", ex);
}
checkForErrors("Unable to write to log file");
} else if (!ignoreError) {
throwMetricsException("Unable to write to log file");
}
for (AbstractMetric metric : record.metrics()) {
currentOutStream.printf("%s%s=%s", separator, metric.name(),
metric.value());
}
currentOutStream.println();
// If we don't hflush(), the data may not be written until the file is
// closed. The file won't be closed until the top of the hour *AND*
// another record is received. Calling hflush() makes sure that the data
// is complete at the top of the hour.
try {
currentFSOutStream.hflush();
} catch (IOException ex) {
throwMetricsException("Failed flushing the stream", ex);
}
checkForErrors("Unable to write to log file");
} else if (!ignoreError) {
throwMetricsException("Unable to write to log file");
}
}
@Override
public void flush() {
// currentOutStream is null if currentFSOutStream is null
if (currentFSOutStream != null) {
try {
currentFSOutStream.hflush();
} catch (IOException ex) {
throwMetricsException("Unable to flush log file", ex);
synchronized (lock) {
// currentOutStream is null if currentFSOutStream is null
if (currentFSOutStream != null) {
try {
currentFSOutStream.hflush();
} catch (IOException ex) {
throwMetricsException("Unable to flush log file", ex);
}
}
}
}
@Override
public void close() throws IOException {
if (currentOutStream != null) {
currentOutStream.close();
public void close() {
synchronized (lock) {
if (currentOutStream != null) {
currentOutStream.close();
try {
checkForErrors("Unable to close log file");
} finally {
// Null out the streams just in case someone tries to reuse us.
currentOutStream = null;
currentFSOutStream = null;
try {
checkForErrors("Unable to close log file");
} finally {
// Null out the streams just in case someone tries to reuse us.
currentOutStream = null;
currentFSOutStream = null;
}
}
}
}

View File

@ -25,6 +25,7 @@
import java.net.InetAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.Calendar;
@ -214,8 +215,7 @@ protected String doWriteTest(MetricsSystem ms, String path, int count)
protected String readLogFile(String path, String then, int count)
throws IOException, URISyntaxException {
final String now = DATE_FORMAT.format(new Date());
final String logFile =
"testsrc-" + InetAddress.getLocalHost().getHostName() + ".log";
final String logFile = getLogFilename();
FileSystem fs = FileSystem.get(new URI(path), new Configuration());
StringBuilder metrics = new StringBuilder();
boolean found = false;
@ -258,7 +258,10 @@ protected void readLogData(FileSystem fs, Path logFile, StringBuilder metrics)
}
/**
* Return the path to the log file to use, based on the target path.
* Return the path to the log file to use, based on the initial path. The
* initial path must be a valid log file path. This method will find the
* most recent version of the file.
*
* @param fs the target FileSystem
* @param initial the path from which to start
* @return the path to use
@ -275,9 +278,19 @@ protected Path findMostRecentLogFile(FileSystem fs, Path initial)
nextLogFile = new Path(initial.toString() + "." + id);
id += 1;
} while (fs.exists(nextLogFile));
return logFile;
}
/**
* Return the name of the log file for this host.
*
* @return the name of the log file for this host
*/
protected static String getLogFilename() throws UnknownHostException {
return "testsrc-" + InetAddress.getLocalHost().getHostName() + ".log";
}
/**
* Assert that the given contents match what is expected from the test
* metrics.
@ -392,8 +405,7 @@ protected void preCreateLogFile(String path, int numFiles)
fs.mkdirs(dir);
Path file = new Path(dir,
"testsrc-" + InetAddress.getLocalHost().getHostName() + ".log");
Path file = new Path(dir, getLogFilename());
// Create the log file to force the sink to append
try (FSDataOutputStream out = fs.create(file)) {
@ -405,8 +417,7 @@ protected void preCreateLogFile(String path, int numFiles)
int count = 1;
while (count < numFiles) {
file = new Path(dir, "testsrc-"
+ InetAddress.getLocalHost().getHostName() + ".log." + count);
file = new Path(dir, getLogFilename() + "." + count);
// Create the log file to force the sink to append
try (FSDataOutputStream out = fs.create(file)) {
@ -482,7 +493,7 @@ public void putMetrics(MetricsRecord record) {
}
@Override
public void close() throws IOException {
public void close() {
try {
super.close();
} catch (MetricsException ex) {