HADOOP-18631. (ADDENDUM) Use LogCapturer to match audit log pattern and remove hdfs async audit log configs (#5451)
This commit is contained in:
parent
fa723ae839
commit
b6a9d7b442
@ -733,43 +733,6 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||
public static final String DFS_NAMENODE_DEFAULT_AUDIT_LOGGER_NAME = "default";
|
||||
public static final String DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_KEY = "dfs.namenode.audit.log.token.tracking.id";
|
||||
public static final boolean DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_DEFAULT = false;
|
||||
/**
|
||||
* Deprecated. Use log4j properties instead.
|
||||
* Set system env variable HDFS_AUDIT_LOGGER, which in tern assigns the value to
|
||||
* "hdfs.audit.logger" for log4j properties to determine log level and appender.
|
||||
*/
|
||||
@Deprecated
|
||||
public static final String DFS_NAMENODE_AUDIT_LOG_ASYNC_KEY = "dfs.namenode.audit.log.async";
|
||||
@Deprecated
|
||||
public static final boolean DFS_NAMENODE_AUDIT_LOG_ASYNC_DEFAULT = false;
|
||||
|
||||
/**
|
||||
* Deprecated. Use log4j properties instead.
|
||||
* Set value to Async appender "blocking" property as part of log4j properties configuration.
|
||||
* <p>
|
||||
* For example,
|
||||
* log4j.appender.ASYNCAPPENDER=org.apache.log4j.AsyncAppender
|
||||
* log4j.appender.ASYNCAPPENDER.blocking=false
|
||||
*/
|
||||
@Deprecated
|
||||
public static final String DFS_NAMENODE_AUDIT_LOG_ASYNC_BLOCKING_KEY =
|
||||
"dfs.namenode.audit.log.async.blocking";
|
||||
@Deprecated
|
||||
public static final boolean DFS_NAMENODE_AUDIT_LOG_ASYNC_BLOCKING_DEFAULT = true;
|
||||
|
||||
/**
|
||||
* Deprecated. Use log4j properties instead.
|
||||
* Set value to Async appender "bufferSize" property as part of log4j properties configuration.
|
||||
* <p>
|
||||
* For example,
|
||||
* log4j.appender.ASYNCAPPENDER=org.apache.log4j.AsyncAppender
|
||||
* log4j.appender.ASYNCAPPENDER.bufferSize=128
|
||||
*/
|
||||
@Deprecated
|
||||
public static final String DFS_NAMENODE_AUDIT_LOG_ASYNC_BUFFER_SIZE_KEY =
|
||||
"dfs.namenode.audit.log.async.buffer.size";
|
||||
@Deprecated
|
||||
public static final int DFS_NAMENODE_AUDIT_LOG_ASYNC_BUFFER_SIZE_DEFAULT = 128;
|
||||
public static final String DFS_NAMENODE_AUDIT_LOG_DEBUG_CMDLIST = "dfs.namenode.audit.log.debug.cmdlist";
|
||||
public static final String DFS_NAMENODE_METRICS_LOGGER_PERIOD_SECONDS_KEY =
|
||||
"dfs.namenode.metrics.logger.period.seconds";
|
||||
|
@ -48,8 +48,6 @@
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_STANDBY_CHECKPOINTS_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_STANDBY_CHECKPOINTS_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOGGERS_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_ASYNC_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_ASYNC_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_DEFAULT;
|
||||
@ -1069,11 +1067,11 @@ static FSNamesystem loadFromDisk(Configuration conf) throws IOException {
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
private static void checkForAsyncLogEnabledByOldConfigs(Configuration conf) {
|
||||
if (conf.getBoolean(DFS_NAMENODE_AUDIT_LOG_ASYNC_KEY, DFS_NAMENODE_AUDIT_LOG_ASYNC_DEFAULT)) {
|
||||
LOG.warn("Use log4j properties to enable async log for audit logs. {} is deprecated",
|
||||
DFS_NAMENODE_AUDIT_LOG_ASYNC_KEY);
|
||||
// dfs.namenode.audit.log.async is no longer in use. Use log4j properties instead.
|
||||
if (conf.getBoolean("dfs.namenode.audit.log.async", false)) {
|
||||
LOG.warn("Use log4j properties to enable async log for audit logs. "
|
||||
+ "dfs.namenode.audit.log.async is no longer in use.");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -5099,35 +5099,6 @@
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.namenode.audit.log.async</name>
|
||||
<value>false</value>
|
||||
<description>
|
||||
If true, enables asynchronous audit log.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.namenode.audit.log.async.blocking</name>
|
||||
<value>true</value>
|
||||
<description>
|
||||
Only used when enables asynchronous audit log. Sets whether audit log async
|
||||
appender should wait if there is no space available in the event buffer or
|
||||
immediately return. Default value is true.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.namenode.audit.log.async.buffer.size</name>
|
||||
<value>128</value>
|
||||
<description>
|
||||
Only used when enables asynchronous audit log. Sets the number of audit
|
||||
logs allowed in the event buffer before the calling thread is blocked
|
||||
(if dfs.namenode.audit.log.async.blocking is true) or until logs are
|
||||
summarized and discarded. Default value is 128.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.namenode.audit.log.token.tracking.id</name>
|
||||
<value>false</value>
|
||||
|
@ -20,12 +20,7 @@
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.File;
|
||||
import java.io.FileReader;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.PrintWriter;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
@ -46,12 +41,15 @@
|
||||
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
|
||||
import org.apache.log4j.Appender;
|
||||
import org.apache.log4j.AsyncAppender;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
@ -66,11 +64,10 @@ public class TestAuditLogs {
|
||||
|
||||
private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(TestAuditLogs.class);
|
||||
|
||||
private static final File AUDIT_LOG_FILE =
|
||||
new File(System.getProperty("hadoop.log.dir"), "hdfs-audit.log");
|
||||
|
||||
final boolean useAsyncEdits;
|
||||
|
||||
private static LogCapturer auditLogCapture;
|
||||
|
||||
@Parameters
|
||||
public static Collection<Object[]> data() {
|
||||
Collection<Object[]> params = new ArrayList<>();
|
||||
@ -111,9 +108,6 @@ public TestAuditLogs(boolean useAsyncEdits) {
|
||||
|
||||
@Before
|
||||
public void setupCluster() throws Exception {
|
||||
try (PrintWriter writer = new PrintWriter(AUDIT_LOG_FILE)) {
|
||||
writer.print("");
|
||||
}
|
||||
// must configure prior to instantiating the namesystem because it
|
||||
// will reconfigure the logger if async is enabled
|
||||
conf = new HdfsConfiguration();
|
||||
@ -132,21 +126,15 @@ public void setupCluster() throws Exception {
|
||||
"org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit");
|
||||
@SuppressWarnings("unchecked")
|
||||
List<Appender> appenders = Collections.list(logger.getAllAppenders());
|
||||
assertEquals(1, appenders.size());
|
||||
assertTrue(appenders.get(0) instanceof AsyncAppender);
|
||||
|
||||
fnames = util.getFileNames(fileName);
|
||||
util.waitReplication(fs, fileName, (short)3);
|
||||
userGroupInfo = UserGroupInformation.createUserForTesting(username, groups);
|
||||
LOG.info("Audit log file: {}, exists: {}, length: {}", AUDIT_LOG_FILE, AUDIT_LOG_FILE.exists(),
|
||||
AUDIT_LOG_FILE.length());
|
||||
}
|
||||
|
||||
@After
|
||||
public void teardownCluster() throws Exception {
|
||||
try (PrintWriter writer = new PrintWriter(AUDIT_LOG_FILE)) {
|
||||
writer.print("");
|
||||
}
|
||||
util.cleanup(fs, "/srcdat");
|
||||
if (fs != null) {
|
||||
fs.close();
|
||||
@ -158,6 +146,17 @@ public void teardownCluster() throws Exception {
|
||||
}
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
public static void beforeClass() {
|
||||
auditLogCapture = LogCapturer.captureLogs(FSNamesystem.AUDIT_LOG);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void afterClass() {
|
||||
auditLogCapture.stopCapturing();
|
||||
}
|
||||
|
||||
|
||||
/** test that allowed operation puts proper entry in audit log */
|
||||
@Test
|
||||
public void testAuditAllowed() throws Exception {
|
||||
@ -273,54 +272,47 @@ public void testAuditCharacterEscape() throws Exception {
|
||||
verifySuccessCommandsAuditLogs(1, "foo", "cmd=create");
|
||||
}
|
||||
|
||||
private void verifySuccessCommandsAuditLogs(int leastExpected, String file, String cmd)
|
||||
throws IOException {
|
||||
|
||||
try (BufferedReader reader = new BufferedReader(new FileReader(AUDIT_LOG_FILE))) {
|
||||
String line;
|
||||
int success = 0;
|
||||
while ((line = reader.readLine()) != null) {
|
||||
assertNotNull(line);
|
||||
LOG.info("Line: {}", line);
|
||||
if (SUCCESS_PATTERN.matcher(line).matches() && line.contains(file) && line.contains(
|
||||
cmd)) {
|
||||
assertTrue("Expected audit event not found in audit log",
|
||||
AUDIT_PATTERN.matcher(line).matches());
|
||||
LOG.info("Successful verification. Log line: {}", line);
|
||||
success++;
|
||||
}
|
||||
private void verifySuccessCommandsAuditLogs(int leastExpected, String file, String cmd) {
|
||||
String[] auditLogOutputLines = auditLogCapture.getOutput().split("\\n");
|
||||
int success = 0;
|
||||
for (String auditLogLine : auditLogOutputLines) {
|
||||
if (!auditLogLine.contains("allowed=")) {
|
||||
continue;
|
||||
}
|
||||
if (success < leastExpected) {
|
||||
throw new AssertionError(
|
||||
"Least expected: " + leastExpected + ". Actual success: " + success);
|
||||
String line = "allowed=" + auditLogLine.split("allowed=")[1];
|
||||
LOG.info("Line: {}", line);
|
||||
if (SUCCESS_PATTERN.matcher(line).matches() && line.contains(file) && line.contains(cmd)) {
|
||||
assertTrue("Expected audit event not found in audit log",
|
||||
AUDIT_PATTERN.matcher(line).matches());
|
||||
LOG.info("Successful verification. Log line: {}", line);
|
||||
success++;
|
||||
}
|
||||
}
|
||||
if (success < leastExpected) {
|
||||
throw new AssertionError(
|
||||
"Least expected: " + leastExpected + ". Actual success: " + success);
|
||||
}
|
||||
}
|
||||
|
||||
private void verifyFailedCommandsAuditLogs(int leastExpected, String file, String cmd)
|
||||
throws IOException {
|
||||
|
||||
try (BufferedReader reader = new BufferedReader(new FileReader(AUDIT_LOG_FILE))) {
|
||||
String line;
|
||||
int success = 0;
|
||||
while ((line = reader.readLine()) != null) {
|
||||
assertNotNull(line);
|
||||
LOG.info("Line: {}", line);
|
||||
if (FAILURE_PATTERN.matcher(line).matches() && line.contains(file) && line.contains(
|
||||
cmd)) {
|
||||
assertTrue("Expected audit event not found in audit log",
|
||||
AUDIT_PATTERN.matcher(line).matches());
|
||||
LOG.info("Failure verification. Log line: {}", line);
|
||||
success++;
|
||||
}
|
||||
private void verifyFailedCommandsAuditLogs(int expected, String file, String cmd) {
|
||||
String[] auditLogOutputLines = auditLogCapture.getOutput().split("\\n");
|
||||
int success = 0;
|
||||
for (String auditLogLine : auditLogOutputLines) {
|
||||
if (!auditLogLine.contains("allowed=")) {
|
||||
continue;
|
||||
}
|
||||
assertEquals("Expected: " + leastExpected + ". Actual failure: " + success, leastExpected,
|
||||
success);
|
||||
if (success < leastExpected) {
|
||||
throw new AssertionError(
|
||||
"Least expected: " + leastExpected + ". Actual success: " + success);
|
||||
String line = "allowed=" + auditLogLine.split("allowed=")[1];
|
||||
LOG.info("Line: {}", line);
|
||||
if (FAILURE_PATTERN.matcher(line).matches() && line.contains(file) && line.contains(
|
||||
cmd)) {
|
||||
assertTrue("Expected audit event not found in audit log",
|
||||
AUDIT_PATTERN.matcher(line).matches());
|
||||
LOG.info("Failure verification. Log line: {}", line);
|
||||
success++;
|
||||
}
|
||||
}
|
||||
assertEquals("Expected: " + expected + ". Actual failure: " + success, expected,
|
||||
success);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -30,12 +30,10 @@
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.FileReader;
|
||||
import java.io.IOException;
|
||||
import java.io.PrintStream;
|
||||
import java.io.PrintWriter;
|
||||
@ -117,11 +115,13 @@
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@ -132,10 +132,7 @@ public class TestFsck {
|
||||
private static final org.slf4j.Logger LOG =
|
||||
LoggerFactory.getLogger(TestFsck.class.getName());
|
||||
|
||||
private static final File AUDIT_LOG_FILE =
|
||||
new File(System.getProperty("hadoop.log.dir"), "hdfs-audit.log");
|
||||
|
||||
// Pattern for:
|
||||
// Pattern for:
|
||||
// allowed=true ugi=name ip=/address cmd=FSCK src=/ dst=null perm=null
|
||||
static final Pattern FSCK_PATTERN = Pattern.compile(
|
||||
"allowed=.*?\\s" +
|
||||
@ -159,6 +156,8 @@ public class TestFsck {
|
||||
private static final String LINE_SEPARATOR =
|
||||
System.getProperty("line.separator");
|
||||
|
||||
private static LogCapturer auditLogCapture;
|
||||
|
||||
public static String runFsck(Configuration conf, int expectedErrCode,
|
||||
boolean checkErrorCode, String... path)
|
||||
throws Exception {
|
||||
@ -179,6 +178,16 @@ public static String runFsck(Configuration conf, int expectedErrCode,
|
||||
private MiniDFSCluster cluster = null;
|
||||
private Configuration conf = null;
|
||||
|
||||
@BeforeClass
|
||||
public static void beforeClass() {
|
||||
auditLogCapture = LogCapturer.captureLogs(FSNamesystem.AUDIT_LOG);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void afterClass() {
|
||||
auditLogCapture.stopCapturing();
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
conf = new Configuration();
|
||||
@ -191,11 +200,6 @@ public void tearDown() throws Exception {
|
||||
shutdownCluster();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void afterClass() throws Exception {
|
||||
assertTrue(AUDIT_LOG_FILE.delete());
|
||||
}
|
||||
|
||||
private void shutdownCluster() throws Exception {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
@ -245,29 +249,30 @@ public void testFsck() throws Exception {
|
||||
util.cleanup(fs, "/srcdat");
|
||||
}
|
||||
|
||||
private void verifyAuditLogs() throws IOException {
|
||||
try (BufferedReader reader = new BufferedReader(new FileReader(AUDIT_LOG_FILE))) {
|
||||
// Audit log should contain one getfileinfo and one fsck
|
||||
String line;
|
||||
int getFileStatusSuccess = 0;
|
||||
int fsckCount = 0;
|
||||
while ((line = reader.readLine()) != null) {
|
||||
LOG.info("Line: {}", line);
|
||||
if (line.contains("cmd=getfileinfo") && GET_FILE_INFO_PATTERN.matcher(line).matches()) {
|
||||
getFileStatusSuccess++;
|
||||
} else if (FSCK_PATTERN.matcher(line).matches()) {
|
||||
fsckCount++;
|
||||
}
|
||||
private void verifyAuditLogs() {
|
||||
String[] auditLogOutputLines = auditLogCapture.getOutput().split("\\n");
|
||||
int fileStatusSuccess = 0;
|
||||
int fsckCount = 0;
|
||||
for (String auditLogLine : auditLogOutputLines) {
|
||||
if (!auditLogLine.contains("allowed=")) {
|
||||
continue;
|
||||
}
|
||||
if (getFileStatusSuccess < 2) {
|
||||
throw new AssertionError(
|
||||
"getfileinfo cmd should occur at least 2 times. Actual count: " + getFileStatusSuccess);
|
||||
}
|
||||
if (fsckCount < 1) {
|
||||
throw new AssertionError(
|
||||
"fsck should be present at least once. Actual count: " + fsckCount);
|
||||
String extractedAuditLog = "allowed=" + auditLogLine.split("allowed=")[1];
|
||||
LOG.info("Line: {}", extractedAuditLog);
|
||||
if (extractedAuditLog.contains("cmd=getfileinfo") && GET_FILE_INFO_PATTERN.matcher(
|
||||
extractedAuditLog).matches()) {
|
||||
fileStatusSuccess++;
|
||||
} else if (FSCK_PATTERN.matcher(extractedAuditLog).matches()) {
|
||||
fsckCount++;
|
||||
}
|
||||
}
|
||||
if (fileStatusSuccess < 2) {
|
||||
throw new AssertionError(
|
||||
"getfileinfo cmd should occur at least 2 times. Actual count: " + fileStatusSuccess);
|
||||
}
|
||||
if (fsckCount < 1) {
|
||||
throw new AssertionError("fsck should be present at least once. Actual count: " + fsckCount);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
|
Loading…
Reference in New Issue
Block a user