HDFS-6888. Allow selectively audit logging ops (Contributed by Chen He)

This commit is contained in:
Vinayakumar B 2015-05-15 11:05:01 +05:30
parent cb8e69a80c
commit 3bef7c80a9
5 changed files with 152 additions and 2 deletions

View File

@ -552,6 +552,8 @@ Release 2.8.0 - UNRELEASED
HDFS-8350. Remove old webhdfs.xml and other outdated documentation stuff. HDFS-8350. Remove old webhdfs.xml and other outdated documentation stuff.
(Brahma Reddy Battula via aajisaka) (Brahma Reddy Battula via aajisaka)
HDFS-6888. Allow selectively audit logging ops (Chen He via vinayakumarb)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -341,6 +341,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final boolean DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_DEFAULT = false; public static final boolean DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_DEFAULT = false;
public static final String DFS_NAMENODE_AUDIT_LOG_ASYNC_KEY = "dfs.namenode.audit.log.async"; public static final String DFS_NAMENODE_AUDIT_LOG_ASYNC_KEY = "dfs.namenode.audit.log.async";
public static final boolean DFS_NAMENODE_AUDIT_LOG_ASYNC_DEFAULT = false; public static final boolean DFS_NAMENODE_AUDIT_LOG_ASYNC_DEFAULT = false;
public static final String DFS_NAMENODE_AUDIT_LOG_DEBUG_CMDLIST = "dfs.namenode.audit.log.debug.cmdlist";
public static final String DFS_BALANCER_MOVEDWINWIDTH_KEY = "dfs.balancer.movedWinWidth"; public static final String DFS_BALANCER_MOVEDWINWIDTH_KEY = "dfs.balancer.movedWinWidth";
public static final long DFS_BALANCER_MOVEDWINWIDTH_DEFAULT = 5400*1000L; public static final long DFS_BALANCER_MOVEDWINWIDTH_DEFAULT = 5400*1000L;

View File

@ -8149,15 +8149,20 @@ void checkAccess(String src, FsAction mode) throws IOException {
* defined in the config file. It can also be explicitly listed in the * defined in the config file. It can also be explicitly listed in the
* config file. * config file.
*/ */
private static class DefaultAuditLogger extends HdfsAuditLogger { @VisibleForTesting
static class DefaultAuditLogger extends HdfsAuditLogger {
private boolean logTokenTrackingId; private boolean logTokenTrackingId;
private Set<String> debugCmdSet = new HashSet<String>();
@Override @Override
public void initialize(Configuration conf) { public void initialize(Configuration conf) {
logTokenTrackingId = conf.getBoolean( logTokenTrackingId = conf.getBoolean(
DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_KEY, DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_KEY,
DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_DEFAULT); DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_DEFAULT);
debugCmdSet.addAll(Arrays.asList(conf.getTrimmedStrings(
DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_DEBUG_CMDLIST)));
} }
@Override @Override
@ -8165,7 +8170,9 @@ public void logAuditEvent(boolean succeeded, String userName,
InetAddress addr, String cmd, String src, String dst, InetAddress addr, String cmd, String src, String dst,
FileStatus status, UserGroupInformation ugi, FileStatus status, UserGroupInformation ugi,
DelegationTokenSecretManager dtSecretManager) { DelegationTokenSecretManager dtSecretManager) {
if (auditLog.isInfoEnabled()) {
if (auditLog.isDebugEnabled() ||
(auditLog.isInfoEnabled() && !debugCmdSet.contains(cmd))) {
final StringBuilder sb = auditBuffer.get(); final StringBuilder sb = auditBuffer.get();
sb.setLength(0); sb.setLength(0);
sb.append("allowed=").append(succeeded).append("\t"); sb.append("allowed=").append(succeeded).append("\t");

View File

@ -2084,6 +2084,15 @@
</description> </description>
</property> </property>
<property>
<name>dfs.namenode.audit.log.debug.cmdlist</name>
<value></value>
<description>
A comma separated list of NameNode commands that are written to the HDFS
namenode audit log only if the audit log level is debug.
</description>
</property>
<property> <property>
<name>dfs.client.use.legacy.blockreader.local</name> <name>dfs.client.use.legacy.blockreader.local</name>
<value>false</value> <value>false</value>

View File

@ -0,0 +1,131 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.DefaultAuditLogger;
import org.apache.log4j.Level;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import java.net.Inet4Address;
import java.util.Arrays;
import java.util.List;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.*;
/**
* Test that the HDFS Audit logger respects DFS_NAMENODE_AUDIT_LOG_DEBUG_CMDLIST.
*/
public class TestAuditLogAtDebug {
static final Log LOG = LogFactory.getLog(TestAuditLogAtDebug.class);
@Rule
public Timeout timeout = new Timeout(300000);
private static final String DUMMY_COMMAND_1 = "dummycommand1";
private static final String DUMMY_COMMAND_2 = "dummycommand2";
private DefaultAuditLogger makeSpyLogger(
Level level, Optional<List<String>> debugCommands) {
DefaultAuditLogger logger = new DefaultAuditLogger();
Configuration conf = new HdfsConfiguration();
if (debugCommands.isPresent()) {
conf.set(DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_DEBUG_CMDLIST,
Joiner.on(",").join(debugCommands.get()));
}
logger.initialize(conf);
((Log4JLogger) FSNamesystem.auditLog).getLogger().setLevel(level);
return spy(logger);
}
private void logDummyCommandToAuditLog(HdfsAuditLogger logger, String command) {
logger.logAuditEvent(true, "",
Inet4Address.getLoopbackAddress(),
command, "", "",
null, null, null);
}
@Test
public void testDebugCommandNotLoggedAtInfo() {
DefaultAuditLogger logger =
makeSpyLogger(
Level.INFO, Optional.of(Arrays.asList(DUMMY_COMMAND_1)));
logDummyCommandToAuditLog(logger, DUMMY_COMMAND_1);
verify(logger, never()).logAuditMessage(anyString());
}
@Test
public void testDebugCommandLoggedAtDebug() {
DefaultAuditLogger logger =
makeSpyLogger(
Level.DEBUG, Optional.of(Arrays.asList(DUMMY_COMMAND_1)));
logDummyCommandToAuditLog(logger, DUMMY_COMMAND_1);
verify(logger, times(1)).logAuditMessage(anyString());
}
@Test
public void testInfoCommandLoggedAtInfo() {
DefaultAuditLogger logger =
makeSpyLogger(
Level.INFO, Optional.of(Arrays.asList(DUMMY_COMMAND_1)));
logDummyCommandToAuditLog(logger, DUMMY_COMMAND_2);
verify(logger, times(1)).logAuditMessage(anyString());
}
@Test
public void testMultipleDebugCommandsNotLoggedAtInfo() {
DefaultAuditLogger logger =
makeSpyLogger(
Level.INFO,
Optional.of(Arrays.asList(DUMMY_COMMAND_1, DUMMY_COMMAND_2)));
logDummyCommandToAuditLog(logger, DUMMY_COMMAND_1);
logDummyCommandToAuditLog(logger, DUMMY_COMMAND_2);
verify(logger, never()).logAuditMessage(anyString());
}
@Test
public void testMultipleDebugCommandsLoggedAtDebug() {
DefaultAuditLogger logger =
makeSpyLogger(
Level.DEBUG,
Optional.of(Arrays.asList(DUMMY_COMMAND_1, DUMMY_COMMAND_2)));
logDummyCommandToAuditLog(logger, DUMMY_COMMAND_1);
logDummyCommandToAuditLog(logger, DUMMY_COMMAND_2);
verify(logger, times(2)).logAuditMessage(anyString());
}
@Test
public void testEmptyDebugCommands() {
DefaultAuditLogger logger = makeSpyLogger(
Level.INFO, Optional.<List<String>>absent());
logDummyCommandToAuditLog(logger, DUMMY_COMMAND_1);
logDummyCommandToAuditLog(logger, DUMMY_COMMAND_2);
verify(logger, times(2)).logAuditMessage(anyString());
}
}