HADOOP-18631. Migrate Async appenders to log4j properties (#5418)

This commit is contained in:
Viraj Jasani 2023-02-25 09:47:44 -08:00 committed by GitHub
parent 25ebd0b8b1
commit a90238c0b8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 425 additions and 366 deletions

View File

@ -310,4 +310,14 @@
<Method name="reconcile" />
<Bug pattern="SWL_SLEEP_WITH_LOCK_HELD" />
</Match>
<!--
conversionPattern is only set once and used to initiate PatternLayout object
only once. It is set by log4j framework if set as part of log4j properties and accessed
only during first append operation.
-->
<Match>
<Class name="org.apache.hadoop.hdfs.util.AsyncRFAAppender"/>
<Field name="conversionPattern"/>
<Bug pattern="IS2_INCONSISTENT_SYNC"/>
</Match>
</FindBugsFilter>

View File

@ -733,12 +733,43 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_NAMENODE_DEFAULT_AUDIT_LOGGER_NAME = "default";
public static final String DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_KEY = "dfs.namenode.audit.log.token.tracking.id";
public static final boolean DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_DEFAULT = false;
public static final String DFS_NAMENODE_AUDIT_LOG_ASYNC_KEY = "dfs.namenode.audit.log.async";
/**
* Deprecated. Use log4j properties instead.
* Set system env variable HDFS_AUDIT_LOGGER, which in tern assigns the value to
* "hdfs.audit.logger" for log4j properties to determine log level and appender.
*/
@Deprecated
public static final String DFS_NAMENODE_AUDIT_LOG_ASYNC_KEY = "dfs.namenode.audit.log.async";
@Deprecated
public static final boolean DFS_NAMENODE_AUDIT_LOG_ASYNC_DEFAULT = false;
public static final String DFS_NAMENODE_AUDIT_LOG_ASYNC_BLOCKING_KEY = "dfs.namenode.audit.log.async.blocking";
/**
* Deprecated. Use log4j properties instead.
* Set value to Async appender "blocking" property as part of log4j properties configuration.
* <p>
* For example,
* log4j.appender.ASYNCAPPENDER=org.apache.log4j.AsyncAppender
* log4j.appender.ASYNCAPPENDER.blocking=false
*/
@Deprecated
public static final String DFS_NAMENODE_AUDIT_LOG_ASYNC_BLOCKING_KEY =
"dfs.namenode.audit.log.async.blocking";
@Deprecated
public static final boolean DFS_NAMENODE_AUDIT_LOG_ASYNC_BLOCKING_DEFAULT = true;
public static final String DFS_NAMENODE_AUDIT_LOG_ASYNC_BUFFER_SIZE_KEY = "dfs.namenode.audit.log.async.buffer.size";
public static final int DFS_NAMENODE_AUDIT_LOG_ASYNC_BUFFER_SIZE_DEFAULT = 128;
/**
* Deprecated. Use log4j properties instead.
* Set value to Async appender "bufferSize" property as part of log4j properties configuration.
* <p>
* For example,
* log4j.appender.ASYNCAPPENDER=org.apache.log4j.AsyncAppender
* log4j.appender.ASYNCAPPENDER.bufferSize=128
*/
@Deprecated
public static final String DFS_NAMENODE_AUDIT_LOG_ASYNC_BUFFER_SIZE_KEY =
"dfs.namenode.audit.log.async.buffer.size";
@Deprecated
public static final int DFS_NAMENODE_AUDIT_LOG_ASYNC_BUFFER_SIZE_DEFAULT = 128;
public static final String DFS_NAMENODE_AUDIT_LOG_DEBUG_CMDLIST = "dfs.namenode.audit.log.debug.cmdlist";
public static final String DFS_NAMENODE_METRICS_LOGGER_PERIOD_SECONDS_KEY =
"dfs.namenode.metrics.logger.period.seconds";

View File

@ -18,9 +18,7 @@
package org.apache.hadoop.hdfs.server.common;
import java.lang.management.ManagementFactory;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import javax.management.Attribute;
@ -34,8 +32,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.log4j.Appender;
import org.apache.log4j.AsyncAppender;
/**
* MetricsLoggerTask can be used as utility to dump metrics to log.
@ -56,12 +52,12 @@ public class MetricsLoggerTask implements Runnable {
}
}
private org.apache.log4j.Logger metricsLog;
private Logger metricsLog;
private String nodeName;
private short maxLogLineLength;
public MetricsLoggerTask(String metricsLog, String nodeName, short maxLogLineLength) {
this.metricsLog = org.apache.log4j.Logger.getLogger(metricsLog);
this.metricsLog = LoggerFactory.getLogger(metricsLog);
this.nodeName = nodeName;
this.maxLogLineLength = maxLogLineLength;
}
@ -115,8 +111,11 @@ private String trimLine(String valueStr) {
.substring(0, maxLogLineLength) + "...");
}
private static boolean hasAppenders(org.apache.log4j.Logger logger) {
return logger.getAllAppenders().hasMoreElements();
// TODO : hadoop-logging module to hide log4j implementation details, this method
// can directly call utility from hadoop-logging.
private static boolean hasAppenders(Logger logger) {
return org.apache.log4j.Logger.getLogger(logger.getName()).getAllAppenders()
.hasMoreElements();
}
/**
@ -138,26 +137,4 @@ private static Set<String> getFilteredAttributes(MBeanInfo mBeanInfo) {
return attributeNames;
}
/**
* Make the metrics logger async and add all pre-existing appenders to the
* async appender.
*/
public static void makeMetricsLoggerAsync(String metricsLog) {
org.apache.log4j.Logger logger = org.apache.log4j.Logger.getLogger(metricsLog);
logger.setAdditivity(false); // Don't pollute actual logs with metrics dump
@SuppressWarnings("unchecked")
List<Appender> appenders = Collections.list(logger.getAllAppenders());
// failsafe against trying to async it more than once
if (!appenders.isEmpty() && !(appenders.get(0) instanceof AsyncAppender)) {
AsyncAppender asyncAppender = new AsyncAppender();
// change logger to have an async appender containing all the
// previously configured appenders
for (Appender appender : appenders) {
logger.removeAppender(appender);
asyncAppender.addAppender(appender);
}
logger.addAppender(asyncAppender);
}
}
}

View File

@ -4058,8 +4058,6 @@ protected void startMetricsLogger() {
return;
}
MetricsLoggerTask.makeMetricsLoggerAsync(METRICS_LOG_NAME);
// Schedule the periodic logging.
metricsLoggerTimer = new ScheduledThreadPoolExecutor(1);
metricsLoggerTimer.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);

View File

@ -338,10 +338,9 @@
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.util.Lists;
import org.apache.log4j.Logger;
import org.apache.log4j.Appender;
import org.apache.log4j.AsyncAppender;
import org.eclipse.jetty.util.ajax.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.base.Charsets;
@ -349,8 +348,6 @@
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.LoggerFactory;
/**
* FSNamesystem is a container of both transient
* and persisted name-space state, and does all the book-keeping
@ -384,8 +381,7 @@
public class FSNamesystem implements Namesystem, FSNamesystemMBean,
NameNodeMXBean, ReplicatedBlocksMBean, ECBlockGroupsMBean {
public static final org.slf4j.Logger LOG = LoggerFactory
.getLogger(FSNamesystem.class.getName());
public static final Logger LOG = LoggerFactory.getLogger(FSNamesystem.class);
// The following are private configurations
public static final String DFS_NAMENODE_SNAPSHOT_TRASHROOT_ENABLED =
@ -488,7 +484,8 @@ private boolean isClientPortInfoAbsent(CallerContext ctx){
* perm=&lt;permissions (optional)&gt;
* </code>
*/
public static final Logger AUDIT_LOG = Logger.getLogger(FSNamesystem.class.getName() + ".audit");
public static final Logger AUDIT_LOG =
LoggerFactory.getLogger(FSNamesystem.class.getName() + ".audit");
private final int maxCorruptFileBlocksReturn;
private final boolean isPermissionEnabled;
@ -858,11 +855,7 @@ static FSNamesystem loadFromDisk(Configuration conf) throws IOException {
throws IOException {
provider = DFSUtil.createKeyProviderCryptoExtension(conf);
LOG.info("KeyProvider: " + provider);
if (conf.getBoolean(DFS_NAMENODE_AUDIT_LOG_ASYNC_KEY,
DFS_NAMENODE_AUDIT_LOG_ASYNC_DEFAULT)) {
LOG.info("Enabling async auditlog");
enableAsyncAuditLog(conf);
}
checkForAsyncLogEnabledByOldConfigs(conf);
auditLogWithRemotePort =
conf.getBoolean(DFS_NAMENODE_AUDIT_LOG_WITH_REMOTE_PORT_KEY,
DFS_NAMENODE_AUDIT_LOG_WITH_REMOTE_PORT_DEFAULT);
@ -1076,6 +1069,14 @@ static FSNamesystem loadFromDisk(Configuration conf) throws IOException {
}
}
@SuppressWarnings("deprecation")
private static void checkForAsyncLogEnabledByOldConfigs(Configuration conf) {
if (conf.getBoolean(DFS_NAMENODE_AUDIT_LOG_ASYNC_KEY, DFS_NAMENODE_AUDIT_LOG_ASYNC_DEFAULT)) {
LOG.warn("Use log4j properties to enable async log for audit logs. {} is deprecated",
DFS_NAMENODE_AUDIT_LOG_ASYNC_KEY);
}
}
@VisibleForTesting
public List<AuditLogger> getAuditLoggers() {
return auditLoggers;
@ -8856,30 +8857,6 @@ public void logAuditMessage(String message) {
}
}
private static void enableAsyncAuditLog(Configuration conf) {
Logger logger = AUDIT_LOG;
@SuppressWarnings("unchecked")
List<Appender> appenders = Collections.list(logger.getAllAppenders());
// failsafe against trying to async it more than once
if (!appenders.isEmpty() && !(appenders.get(0) instanceof AsyncAppender)) {
AsyncAppender asyncAppender = new AsyncAppender();
asyncAppender.setBlocking(conf.getBoolean(
DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_ASYNC_BLOCKING_KEY,
DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_ASYNC_BLOCKING_DEFAULT
));
asyncAppender.setBufferSize(conf.getInt(
DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_ASYNC_BUFFER_SIZE_KEY,
DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_ASYNC_BUFFER_SIZE_DEFAULT
));
// change logger to have an async appender containing all the
// previously configured appenders
for (Appender appender : appenders) {
logger.removeAppender(appender);
asyncAppender.addAppender(appender);
}
logger.addAppender(asyncAppender);
}
}
/**
* Return total number of Sync Operations on FSEditLog.
*/

View File

@ -946,8 +946,6 @@ protected void startMetricsLogger(Configuration conf) {
return;
}
MetricsLoggerTask.makeMetricsLoggerAsync(METRICS_LOG_NAME);
// Schedule the periodic logging.
metricsLoggerTimer = new ScheduledThreadPoolExecutor(1);
metricsLoggerTimer.setExecuteExistingDelayedTasksAfterShutdownPolicy(

View File

@ -0,0 +1,146 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.util;
import java.io.IOException;
import org.apache.log4j.AsyncAppender;
import org.apache.log4j.PatternLayout;
import org.apache.log4j.RollingFileAppender;
import org.apache.log4j.spi.LoggingEvent;
/**
* Until we migrate to log4j2, use this appender for namenode audit logger as well as
* datanode and namenode metric loggers with log4j properties, if async logging is required with
* RFA.
* This appender will take parameters necessary to supply RollingFileAppender to AsyncAppender.
* While migrating to log4j2, we can directly wrap RFA appender to Async appender as part of
* log4j2 properties. However, same is not possible with log4j1 properties.
*/
public class AsyncRFAAppender extends AsyncAppender {
/**
* The default maximum file size is 10MB.
*/
private String maxFileSize = String.valueOf(10*1024*1024);
/**
* There is one backup file by default.
*/
private int maxBackupIndex = 1;
/**
* The name of the log file.
*/
private String fileName = null;
private String conversionPattern = null;
/**
* Does appender block when buffer is full.
*/
private boolean blocking = true;
/**
* Buffer size.
*/
private int bufferSize = DEFAULT_BUFFER_SIZE;
private RollingFileAppender rollingFileAppender = null;
private volatile boolean isRollingFileAppenderAssigned = false;
@Override
public void append(LoggingEvent event) {
if (rollingFileAppender == null) {
appendRFAToAsyncAppender();
}
super.append(event);
}
private synchronized void appendRFAToAsyncAppender() {
if (!isRollingFileAppenderAssigned) {
PatternLayout patternLayout;
if (conversionPattern != null) {
patternLayout = new PatternLayout(conversionPattern);
} else {
patternLayout = new PatternLayout();
}
try {
rollingFileAppender = new RollingFileAppender(patternLayout, fileName, true);
} catch (IOException e) {
throw new RuntimeException(e);
}
rollingFileAppender.setMaxBackupIndex(maxBackupIndex);
rollingFileAppender.setMaxFileSize(maxFileSize);
this.addAppender(rollingFileAppender);
isRollingFileAppenderAssigned = true;
super.setBlocking(blocking);
super.setBufferSize(bufferSize);
}
}
public String getMaxFileSize() {
return maxFileSize;
}
public void setMaxFileSize(String maxFileSize) {
this.maxFileSize = maxFileSize;
}
public int getMaxBackupIndex() {
return maxBackupIndex;
}
public void setMaxBackupIndex(int maxBackupIndex) {
this.maxBackupIndex = maxBackupIndex;
}
public String getFileName() {
return fileName;
}
public void setFileName(String fileName) {
this.fileName = fileName;
}
public String getConversionPattern() {
return conversionPattern;
}
public void setConversionPattern(String conversionPattern) {
this.conversionPattern = conversionPattern;
}
public boolean isBlocking() {
return blocking;
}
public void setBlocking(boolean blocking) {
this.blocking = blocking;
}
public int getBufferSize() {
return bufferSize;
}
public void setBufferSize(int bufferSize) {
this.bufferSize = bufferSize;
}
}

View File

@ -30,7 +30,6 @@
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeoutException;
import java.util.regex.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -40,12 +39,11 @@
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.namenode.PatternMatchingAppender;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Appender;
import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.AsyncAppender;
import org.apache.log4j.spi.LoggingEvent;
import org.junit.After;
import org.junit.Assert;
import org.junit.Rule;
@ -151,9 +149,9 @@ public void testMetricsLogOutput() throws IOException, InterruptedException,
metricsProvider);
startDNForTest(true);
assertNotNull(dn);
final PatternMatchingAppender appender = new PatternMatchingAppender(
"^.*FakeMetric.*$");
addAppender(org.apache.log4j.Logger.getLogger(DataNode.METRICS_LOG_NAME), appender);
final PatternMatchingAppender appender =
(PatternMatchingAppender) org.apache.log4j.Logger.getLogger(DataNode.METRICS_LOG_NAME)
.getAppender("PATTERNMATCHERAPPENDER");
// Ensure that the supplied pattern was matched.
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@ -186,37 +184,4 @@ public int getFakeMetric() {
}
}
/**
* An appender that matches logged messages against the given regular
* expression.
*/
public static class PatternMatchingAppender extends AppenderSkeleton {
private final Pattern pattern;
private volatile boolean matched;
public PatternMatchingAppender(String pattern) {
this.pattern = Pattern.compile(pattern);
this.matched = false;
}
public boolean isMatched() {
return matched;
}
@Override
protected void append(LoggingEvent event) {
if (pattern.matcher(event.getMessage().toString()).matches()) {
matched = true;
}
}
@Override
public void close() {
}
@Override
public boolean requiresLayout() {
return false;
}
}
}

View File

@ -0,0 +1,58 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import java.util.regex.Pattern;
import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.spi.LoggingEvent;
/**
* An appender that matches logged messages against the given
* regular expression.
*/
public class PatternMatchingAppender extends AppenderSkeleton {
private final Pattern pattern;
private volatile boolean matched;
public PatternMatchingAppender() {
this.pattern = Pattern.compile("^.*FakeMetric.*$");
this.matched = false;
}
public boolean isMatched() {
return matched;
}
@Override
protected void append(LoggingEvent event) {
if (pattern.matcher(event.getMessage().toString()).matches()) {
matched = true;
}
}
@Override
public void close() {
}
@Override
public boolean requiresLayout() {
return false;
}
}

View File

@ -26,11 +26,11 @@
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.FSNamesystemAuditLogger;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.slf4j.event.Level;
import java.net.Inet4Address;
import java.util.Arrays;

View File

@ -25,10 +25,10 @@
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Enumeration;
import java.util.List;
import java.util.regex.Pattern;
@ -46,15 +46,10 @@
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.PathUtils;
import org.apache.log4j.Appender;
import org.apache.log4j.AsyncAppender;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.log4j.PatternLayout;
import org.apache.log4j.RollingFileAppender;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -68,36 +63,39 @@
*/
@RunWith(Parameterized.class)
public class TestAuditLogs {
static final String auditLogFile = PathUtils.getTestDirName(TestAuditLogs.class) + "/TestAuditLogs-audit.log";
final boolean useAsyncLog;
private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(TestAuditLogs.class);
private static final File AUDIT_LOG_FILE =
new File(System.getProperty("hadoop.log.dir"), "hdfs-audit.log");
final boolean useAsyncEdits;
@Parameters
public static Collection<Object[]> data() {
Collection<Object[]> params = new ArrayList<Object[]>();
params.add(new Object[]{Boolean.FALSE, Boolean.FALSE});
params.add(new Object[]{Boolean.TRUE, Boolean.FALSE});
params.add(new Object[]{Boolean.FALSE, Boolean.TRUE});
params.add(new Object[]{Boolean.TRUE, Boolean.TRUE});
Collection<Object[]> params = new ArrayList<>();
params.add(new Object[]{Boolean.FALSE});
params.add(new Object[]{Boolean.TRUE});
return params;
}
public TestAuditLogs(boolean useAsyncLog, boolean useAsyncEdits) {
this.useAsyncLog = useAsyncLog;
public TestAuditLogs(boolean useAsyncEdits) {
this.useAsyncEdits = useAsyncEdits;
}
// Pattern for:
// allowed=(true|false) ugi=name ip=/address cmd={cmd} src={path} dst=null perm=null
static final Pattern auditPattern = Pattern.compile(
private static final Pattern AUDIT_PATTERN = Pattern.compile(
"allowed=.*?\\s" +
"ugi=.*?\\s" +
"ip=/\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\s" +
"cmd=.*?\\ssrc=.*?\\sdst=null\\s" +
"perm=.*?");
static final Pattern successPattern = Pattern.compile(
private static final Pattern SUCCESS_PATTERN = Pattern.compile(
".*allowed=true.*");
static final Pattern webOpenPattern = Pattern.compile(
private static final Pattern FAILURE_PATTERN = Pattern.compile(
".*allowed=false.*");
private static final Pattern WEB_OPEN_PATTERN = Pattern.compile(
".*cmd=open.*proto=webhdfs.*");
static final String username = "bob";
@ -113,14 +111,15 @@ public TestAuditLogs(boolean useAsyncLog, boolean useAsyncEdits) {
@Before
public void setupCluster() throws Exception {
try (PrintWriter writer = new PrintWriter(AUDIT_LOG_FILE)) {
writer.print("");
}
// must configure prior to instantiating the namesystem because it
// will reconfigure the logger if async is enabled
configureAuditLogs();
conf = new HdfsConfiguration();
final long precision = 1L;
conf.setLong(DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY, precision);
conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 10000L);
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_ASYNC_KEY, useAsyncLog);
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING, useAsyncEdits);
util = new DFSTestUtil.Builder().setName("TestAuditAllowed").
setNumFiles(20).build();
@ -129,19 +128,25 @@ public void setupCluster() throws Exception {
util.createFiles(fs, fileName);
// make sure the appender is what it's supposed to be
Logger logger = FSNamesystem.AUDIT_LOG;
Logger logger = org.apache.log4j.Logger.getLogger(
"org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit");
@SuppressWarnings("unchecked")
List<Appender> appenders = Collections.list(logger.getAllAppenders());
assertEquals(1, appenders.size());
assertEquals(useAsyncLog, appenders.get(0) instanceof AsyncAppender);
assertTrue(appenders.get(0) instanceof AsyncAppender);
fnames = util.getFileNames(fileName);
util.waitReplication(fs, fileName, (short)3);
userGroupInfo = UserGroupInformation.createUserForTesting(username, groups);
LOG.info("Audit log file: {}, exists: {}, length: {}", AUDIT_LOG_FILE, AUDIT_LOG_FILE.exists(),
AUDIT_LOG_FILE.length());
}
@After
public void teardownCluster() throws Exception {
try (PrintWriter writer = new PrintWriter(AUDIT_LOG_FILE)) {
writer.print("");
}
util.cleanup(fs, "/srcdat");
if (fs != null) {
fs.close();
@ -159,11 +164,10 @@ public void testAuditAllowed() throws Exception {
final Path file = new Path(fnames[0]);
FileSystem userfs = DFSTestUtil.getFileSystemAs(userGroupInfo, conf);
setupAuditLogs();
InputStream istream = userfs.open(file);
int val = istream.read();
istream.close();
verifyAuditLogs(true);
verifySuccessCommandsAuditLogs(2, fnames[0], "cmd=open");
assertTrue("failed to read from file", val >= 0);
}
@ -173,9 +177,8 @@ public void testAuditAllowedStat() throws Exception {
final Path file = new Path(fnames[0]);
FileSystem userfs = DFSTestUtil.getFileSystemAs(userGroupInfo, conf);
setupAuditLogs();
FileStatus st = userfs.getFileStatus(file);
verifyAuditLogs(true);
verifySuccessCommandsAuditLogs(2, fnames[0], "cmd=getfileinfo");
assertTrue("failed to stat file", st != null && st.isFile());
}
@ -188,15 +191,13 @@ public void testAuditDenied() throws Exception {
fs.setPermission(file, new FsPermission((short)0600));
fs.setOwner(file, "root", null);
setupAuditLogs();
try {
userfs.open(file);
fail("open must not succeed");
} catch(AccessControlException e) {
System.out.println("got access denied, as expected.");
}
verifyAuditLogs(false);
verifyFailedCommandsAuditLogs(1, fnames[0], "cmd=open");
}
/** test that access via webhdfs puts proper entry in audit log */
@ -207,14 +208,12 @@ public void testAuditWebHdfs() throws Exception {
fs.setPermission(file, new FsPermission((short)0644));
fs.setOwner(file, "root", null);
setupAuditLogs();
WebHdfsFileSystem webfs = WebHdfsTestUtil.getWebHdfsFileSystemAs(userGroupInfo, conf, WebHdfsConstants.WEBHDFS_SCHEME);
InputStream istream = webfs.open(file);
int val = istream.read();
istream.close();
verifyAuditLogsRepeat(true, 3);
verifySuccessCommandsAuditLogs(3, fnames[0], "cmd=open");
assertTrue("failed to read from file", val >= 0);
}
@ -226,12 +225,10 @@ public void testAuditWebHdfsStat() throws Exception {
fs.setPermission(file, new FsPermission((short)0644));
fs.setOwner(file, "root", null);
setupAuditLogs();
WebHdfsFileSystem webfs = WebHdfsTestUtil.getWebHdfsFileSystemAs(userGroupInfo, conf, WebHdfsConstants.WEBHDFS_SCHEME);
FileStatus st = webfs.getFileStatus(file);
verifyAuditLogs(true);
verifySuccessCommandsAuditLogs(2, fnames[0], "cmd=getfileinfo");
assertTrue("failed to stat file", st != null && st.isFile());
}
@ -243,7 +240,6 @@ public void testAuditWebHdfsDenied() throws Exception {
fs.setPermission(file, new FsPermission((short)0600));
fs.setOwner(file, "root", null);
setupAuditLogs();
try {
WebHdfsFileSystem webfs = WebHdfsTestUtil.getWebHdfsFileSystemAs(userGroupInfo, conf, WebHdfsConstants.WEBHDFS_SCHEME);
InputStream istream = webfs.open(file);
@ -252,7 +248,7 @@ public void testAuditWebHdfsDenied() throws Exception {
} catch(AccessControlException E) {
System.out.println("got access denied, as expected.");
}
verifyAuditLogsRepeat(false, 2);
verifyFailedCommandsAuditLogs(1, fnames[0], "cmd=open");
}
/** test that open via webhdfs puts proper entry in audit log */
@ -263,124 +259,68 @@ public void testAuditWebHdfsOpen() throws Exception {
fs.setPermission(file, new FsPermission((short)0644));
fs.setOwner(file, "root", null);
setupAuditLogs();
WebHdfsFileSystem webfs = WebHdfsTestUtil.getWebHdfsFileSystemAs(userGroupInfo, conf, WebHdfsConstants.WEBHDFS_SCHEME);
webfs.open(file).read();
verifyAuditLogsCheckPattern(true, 3, webOpenPattern);
verifySuccessCommandsAuditLogs(3, fnames[0], "cmd=open");
}
/** make sure that "\r\n" isn't made into a newline in audit log */
@Test
public void testAuditCharacterEscape() throws Exception {
final Path file = new Path("foo" + "\r\n" + "bar");
setupAuditLogs();
fs.create(file);
verifyAuditLogsRepeat(true, 1);
verifySuccessCommandsAuditLogs(1, "foo", "cmd=create");
}
/** Sets up log4j logger for auditlogs */
private void setupAuditLogs() throws IOException {
Logger logger = FSNamesystem.AUDIT_LOG;
// enable logging now that the test is ready to run
logger.setLevel(Level.INFO);
}
private void configureAuditLogs() throws IOException {
// Shutdown the LogManager to release all logger open file handles.
// Unfortunately, Apache commons logging library does not provide
// means to release underlying loggers. For additional info look up
// commons library FAQ.
LogManager.shutdown();
File file = new File(auditLogFile);
if (file.exists()) {
assertTrue(file.delete());
}
// disable logging while the cluster startup preps files
disableAuditLog();
PatternLayout layout = new PatternLayout("%m%n");
RollingFileAppender appender = new RollingFileAppender(layout, auditLogFile);
Logger logger = FSNamesystem.AUDIT_LOG;
logger.addAppender(appender);
}
// Ensure audit log has only one entry
private void verifyAuditLogs(boolean expectSuccess) throws IOException {
verifyAuditLogsRepeat(expectSuccess, 1);
}
// Ensure audit log has exactly N entries
private void verifyAuditLogsRepeat(boolean expectSuccess, int ndupe)
private void verifySuccessCommandsAuditLogs(int leastExpected, String file, String cmd)
throws IOException {
// Turn off the logs
disableAuditLog();
// Close the appenders and force all logs to be flushed
Logger logger = FSNamesystem.AUDIT_LOG;
Enumeration<?> appenders = logger.getAllAppenders();
while (appenders.hasMoreElements()) {
Appender appender = (Appender)appenders.nextElement();
appender.close();
}
BufferedReader reader = new BufferedReader(new FileReader(auditLogFile));
String line = null;
boolean ret = true;
try {
for (int i = 0; i < ndupe; i++) {
line = reader.readLine();
try (BufferedReader reader = new BufferedReader(new FileReader(AUDIT_LOG_FILE))) {
String line;
int success = 0;
while ((line = reader.readLine()) != null) {
assertNotNull(line);
assertTrue("Expected audit event not found in audit log",
auditPattern.matcher(line).matches());
ret &= successPattern.matcher(line).matches();
}
assertNull("Unexpected event in audit log", reader.readLine());
assertTrue("Expected success=" + expectSuccess, ret == expectSuccess);
} finally {
reader.close();
}
}
// Ensure audit log has exactly N entries
private void verifyAuditLogsCheckPattern(boolean expectSuccess, int ndupe, Pattern pattern)
throws IOException {
// Turn off the logs
disableAuditLog();
// Close the appenders and force all logs to be flushed
Logger logger = FSNamesystem.AUDIT_LOG;
Enumeration<?> appenders = logger.getAllAppenders();
while (appenders.hasMoreElements()) {
Appender appender = (Appender)appenders.nextElement();
appender.close();
}
BufferedReader reader = new BufferedReader(new FileReader(auditLogFile));
String line = null;
boolean ret = true;
boolean patternMatches = false;
try {
for (int i = 0; i < ndupe; i++) {
line = reader.readLine();
assertNotNull(line);
patternMatches |= pattern.matcher(line).matches();
ret &= successPattern.matcher(line).matches();
LOG.info("Line: {}", line);
if (SUCCESS_PATTERN.matcher(line).matches() && line.contains(file) && line.contains(
cmd)) {
assertTrue("Expected audit event not found in audit log",
AUDIT_PATTERN.matcher(line).matches());
LOG.info("Successful verification. Log line: {}", line);
success++;
}
assertNull("Unexpected event in audit log", reader.readLine());
assertTrue("Expected audit event not found in audit log", patternMatches);
assertTrue("Expected success=" + expectSuccess, ret == expectSuccess);
} finally {
reader.close();
}
if (success < leastExpected) {
throw new AssertionError(
"Least expected: " + leastExpected + ". Actual success: " + success);
}
}
}
private void disableAuditLog() {
GenericTestUtils.disableLog(LoggerFactory.getLogger(
FSNamesystem.class.getName() + ".audit"));
private void verifyFailedCommandsAuditLogs(int leastExpected, String file, String cmd)
throws IOException {
try (BufferedReader reader = new BufferedReader(new FileReader(AUDIT_LOG_FILE))) {
String line;
int success = 0;
while ((line = reader.readLine()) != null) {
assertNotNull(line);
LOG.info("Line: {}", line);
if (FAILURE_PATTERN.matcher(line).matches() && line.contains(file) && line.contains(
cmd)) {
assertTrue("Expected audit event not found in audit log",
AUDIT_PATTERN.matcher(line).matches());
LOG.info("Failure verification. Log line: {}", line);
success++;
}
}
assertEquals("Expected: " + leastExpected + ". Actual failure: " + success, leastExpected,
success);
if (success < leastExpected) {
throw new AssertionError(
"Least expected: " + leastExpected + ". Actual success: " + success);
}
}
}
}

View File

@ -23,7 +23,6 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
@ -119,11 +118,8 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.PatternLayout;
import org.apache.log4j.RollingFileAppender;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -136,8 +132,8 @@ public class TestFsck {
private static final org.slf4j.Logger LOG =
LoggerFactory.getLogger(TestFsck.class.getName());
static final String AUDITLOG_FILE =
GenericTestUtils.getTempPath("TestFsck-audit.log");
private static final File AUDIT_LOG_FILE =
new File(System.getProperty("hadoop.log.dir"), "hdfs-audit.log");
// Pattern for:
// allowed=true ugi=name ip=/address cmd=FSCK src=/ dst=null perm=null
@ -195,6 +191,11 @@ public void tearDown() throws Exception {
shutdownCluster();
}
@AfterClass
public static void afterClass() throws Exception {
assertTrue(AUDIT_LOG_FILE.delete());
}
private void shutdownCluster() throws Exception {
if (cluster != null) {
cluster.shutdown();
@ -221,7 +222,6 @@ public void testFsck() throws Exception {
final Path file = new Path(fileName);
long aTime = fs.getFileStatus(file).getAccessTime();
Thread.sleep(precision);
setupAuditLogs();
String outStr = runFsck(conf, 0, true, "/");
verifyAuditLogs();
assertEquals(aTime, fs.getFileStatus(file).getAccessTime());
@ -245,54 +245,27 @@ public void testFsck() throws Exception {
util.cleanup(fs, "/srcdat");
}
/** Sets up log4j logger for auditlogs. */
private void setupAuditLogs() throws IOException {
File file = new File(AUDITLOG_FILE);
if (file.exists()) {
file.delete();
}
Logger logger = FSNamesystem.AUDIT_LOG;
logger.removeAllAppenders();
logger.setLevel(Level.INFO);
PatternLayout layout = new PatternLayout("%m%n");
RollingFileAppender appender =
new RollingFileAppender(layout, AUDITLOG_FILE);
logger.addAppender(appender);
}
private void verifyAuditLogs() throws IOException {
// Turn off the logs
GenericTestUtils.disableLog(LoggerFactory.getLogger(
FSNamesystem.class.getName() + ".audit"));
BufferedReader reader = null;
try {
try (BufferedReader reader = new BufferedReader(new FileReader(AUDIT_LOG_FILE))) {
// Audit log should contain one getfileinfo and one fsck
reader = new BufferedReader(new FileReader(AUDITLOG_FILE));
String line;
// one extra getfileinfo stems from resolving the path
//
for (int i = 0; i < 2; i++) {
line = reader.readLine();
assertNotNull(line);
assertTrue("Expected getfileinfo event not found in audit log",
GET_FILE_INFO_PATTERN.matcher(line).matches());
int getFileStatusSuccess = 0;
int fsckCount = 0;
while ((line = reader.readLine()) != null) {
LOG.info("Line: {}", line);
if (line.contains("cmd=getfileinfo") && GET_FILE_INFO_PATTERN.matcher(line).matches()) {
getFileStatusSuccess++;
} else if (FSCK_PATTERN.matcher(line).matches()) {
fsckCount++;
}
}
line = reader.readLine();
assertNotNull(line);
assertTrue("Expected fsck event not found in audit log", FSCK_PATTERN
.matcher(line).matches());
assertNull("Unexpected event in audit log", reader.readLine());
} finally {
// Close the reader and remove the appender to release the audit log file
// handle after verifying the content of the file.
if (reader != null) {
reader.close();
if (getFileStatusSuccess < 2) {
throw new AssertionError(
"getfileinfo cmd should occur at least 2 times. Actual count: " + getFileStatusSuccess);
}
Logger logger = FSNamesystem.AUDIT_LOG;
if (logger != null) {
logger.removeAllAppenders();
if (fsckCount < 1) {
throw new AssertionError(
"fsck should be present at least once. Actual count: " + fsckCount);
}
}
}
@ -1411,7 +1384,6 @@ public void testFsckSymlink() throws Exception {
util.waitReplication(fs, fileName, (short)3);
long aTime = fc.getFileStatus(symlink).getAccessTime();
Thread.sleep(precision);
setupAuditLogs();
String outStr = runFsck(conf, 0, true, "/");
verifyAuditLogs();
assertEquals(aTime, fc.getFileStatus(symlink).getAccessTime());
@ -2055,7 +2027,6 @@ public void testECFsck() throws Exception {
long replTime = fs.getFileStatus(replFilePath).getAccessTime();
long ecTime = fs.getFileStatus(largeFilePath).getAccessTime();
Thread.sleep(precision);
setupAuditLogs();
String outStr = runFsck(conf, 0, true, "/");
verifyAuditLogs();
assertEquals(replTime, fs.getFileStatus(replFilePath).getAccessTime());

View File

@ -26,9 +26,8 @@
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Appender;
import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.AsyncAppender;
import org.apache.log4j.spi.LoggingEvent;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
@ -37,7 +36,6 @@
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeoutException;
import java.util.regex.Pattern;
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
import static org.junit.Assert.*;
@ -86,8 +84,8 @@ public void testMetricsLogOutput()
"DummyMetrics", metricsProvider);
makeNameNode(true); // Log metrics early and often.
final PatternMatchingAppender appender =
new PatternMatchingAppender("^.*FakeMetric42.*$");
addAppender(org.apache.log4j.Logger.getLogger(NameNode.METRICS_LOG_NAME), appender);
(PatternMatchingAppender) org.apache.log4j.Logger.getLogger(NameNode.METRICS_LOG_NAME)
.getAppender("PATTERNMATCHERAPPENDER");
// Ensure that the supplied pattern was matched.
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@ -115,12 +113,6 @@ private NameNode makeNameNode(boolean enableMetricsLogging)
return new TestNameNode(conf);
}
private void addAppender(org.apache.log4j.Logger logger, Appender appender) {
@SuppressWarnings("unchecked")
List<Appender> appenders = Collections.list(logger.getAllAppenders());
((AsyncAppender) appenders.get(0)).addAppender(appender);
}
/**
* A NameNode that stubs out the NameSystem for testing.
*/
@ -149,37 +141,4 @@ public int getFakeMetric42() {
}
}
/**
* An appender that matches logged messages against the given
* regular expression.
*/
public static class PatternMatchingAppender extends AppenderSkeleton {
private final Pattern pattern;
private volatile boolean matched;
public PatternMatchingAppender(String pattern) {
this.pattern = Pattern.compile(pattern);
this.matched = false;
}
public boolean isMatched() {
return matched;
}
@Override
protected void append(LoggingEvent event) {
if (pattern.matcher(event.getMessage().toString()).matches()) {
matched = true;
}
}
@Override
public void close() {
}
@Override
public boolean requiresLayout() {
return false;
}
}
}

View File

@ -45,7 +45,7 @@
*/
public class TestDNFencingWithReplication {
static {
GenericTestUtils.setLogLevel(FSNamesystem.AUDIT_LOG, org.apache.log4j.Level.WARN);
GenericTestUtils.setLogLevel(FSNamesystem.AUDIT_LOG, Level.WARN);
GenericTestUtils.setLogLevel(Server.LOG, Level.ERROR);
GenericTestUtils.setLogLevel(RetryInvocationHandler.LOG, Level.ERROR);
}

View File

@ -22,31 +22,60 @@ log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} (%F:%M(%L)) - %m%n
# Only to be used for testing
log4j.appender.PATTERNMATCHERAPPENDER=org.apache.hadoop.hdfs.server.namenode.PatternMatchingAppender
#
# NameNode metrics logging.
# The default is to retain two namenode-metrics.log files up to 64MB each.
#
log4j.logger.NameNodeMetricsLog=INFO,NNMETRICSRFA
# TODO : While migrating to log4j2, replace AsyncRFAAppender with AsyncAppender as
# log4j2 properties support wrapping of other appenders to AsyncAppender using appender ref
namenode.metrics.logger=INFO,ASYNCNNMETRICSRFA,PATTERNMATCHERAPPENDER
log4j.logger.NameNodeMetricsLog=${namenode.metrics.logger}
log4j.additivity.NameNodeMetricsLog=false
log4j.appender.NNMETRICSRFA=org.apache.log4j.RollingFileAppender
log4j.appender.NNMETRICSRFA.File=${hadoop.log.dir}/namenode-metrics.log
log4j.appender.NNMETRICSRFA.layout=org.apache.log4j.PatternLayout
log4j.appender.NNMETRICSRFA.layout.ConversionPattern=%d{ISO8601} %m%n
log4j.appender.NNMETRICSRFA.MaxBackupIndex=1
log4j.appender.NNMETRICSRFA.MaxFileSize=64MB
log4j.appender.ASYNCNNMETRICSRFA=org.apache.hadoop.hdfs.util.AsyncRFAAppender
log4j.appender.ASYNCNNMETRICSRFA.conversionPattern=%d{ISO8601} %m%n
log4j.appender.ASYNCNNMETRICSRFA.maxFileSize=64MB
log4j.appender.ASYNCNNMETRICSRFA.fileName=${hadoop.log.dir}/namenode-metrics.log
log4j.appender.ASYNCNNMETRICSRFA.maxBackupIndex=1
#
# DataNode metrics logging.
# The default is to retain two datanode-metrics.log files up to 64MB each.
#
log4j.logger.DataNodeMetricsLog=INFO,DNMETRICSRFA
# TODO : While migrating to log4j2, replace AsyncRFAAppender with AsyncAppender as
# log4j2 properties support wrapping of other appenders to AsyncAppender using appender ref
datanode.metrics.logger=INFO,ASYNCDNMETRICSRFA,PATTERNMATCHERAPPENDER
log4j.logger.DataNodeMetricsLog=${datanode.metrics.logger}
log4j.additivity.DataNodeMetricsLog=false
log4j.appender.DNMETRICSRFA=org.apache.log4j.RollingFileAppender
log4j.appender.DNMETRICSRFA.File=${hadoop.log.dir}/datanode-metrics.log
log4j.appender.DNMETRICSRFA.layout=org.apache.log4j.PatternLayout
log4j.appender.DNMETRICSRFA.layout.ConversionPattern=%d{ISO8601} %m%n
log4j.appender.DNMETRICSRFA.MaxBackupIndex=1
log4j.appender.DNMETRICSRFA.MaxFileSize=64MB
log4j.appender.ASYNCDNMETRICSRFA=org.apache.hadoop.hdfs.util.AsyncRFAAppender
log4j.appender.ASYNCDNMETRICSRFA.conversionPattern=%d{ISO8601} %m%n
log4j.appender.ASYNCDNMETRICSRFA.maxFileSize=64MB
log4j.appender.ASYNCDNMETRICSRFA.fileName=${hadoop.log.dir}/datanode-metrics.log
log4j.appender.ASYNCDNMETRICSRFA.maxBackupIndex=1
# Supress KMS error log
log4j.logger.com.sun.jersey.server.wadl.generators.WadlGeneratorJAXBGrammarGenerator=OFF
#
# hdfs audit logging
#
# TODO : While migrating to log4j2, replace AsyncRFAAppender with AsyncAppender as
# log4j2 properties support wrapping of other appenders to AsyncAppender using appender ref
hdfs.audit.logger=INFO,ASYNCAUDITAPPENDER
hdfs.audit.log.maxfilesize=256MB
hdfs.audit.log.maxbackupindex=20
log4j.logger.org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit=${hdfs.audit.logger}
log4j.additivity.org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit=false
log4j.appender.ASYNCAUDITAPPENDER=org.apache.hadoop.hdfs.util.AsyncRFAAppender
log4j.appender.ASYNCAUDITAPPENDER.blocking=false
log4j.appender.ASYNCAUDITAPPENDER.bufferSize=256
log4j.appender.ASYNCAUDITAPPENDER.conversionPattern=%m%n
log4j.appender.ASYNCAUDITAPPENDER.maxFileSize=${hdfs.audit.log.maxfilesize}
log4j.appender.ASYNCAUDITAPPENDER.fileName=${hadoop.log.dir}/hdfs-audit.log
log4j.appender.ASYNCAUDITAPPENDER.maxBackupIndex=${hdfs.audit.log.maxbackupindex}