HDFS-16266. Add remote port information to HDFS audit log (#3538)

Reviewed-by: Akira Ajisaka <aajisaka@apache.org>
Reviewed-by: Wei-Chiu Chuang <weichiu@apache.org>
Signed-off-by: Takanobu Asanuma <tasanuma@apache.org>
This commit is contained in:
litao 2021-11-04 09:16:03 +08:00 committed by GitHub
parent a21895a5b3
commit 359b03c83e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 156 additions and 20 deletions

View File

@ -369,11 +369,20 @@ public static int getCallRetryCount() {
} }
/** Returns the remote side ip address when invoked inside an RPC /** Returns the remote side ip address when invoked inside an RPC
* Returns null incase of an error. * Returns null in case of an error.
*/ */
public static InetAddress getRemoteIp() { public static InetAddress getRemoteIp() {
Call call = CurCall.get(); Call call = CurCall.get();
return (call != null ) ? call.getHostInetAddress() : null; return (call != null) ? call.getHostInetAddress() : null;
}
/**
* Returns the remote side port when invoked inside an RPC
* Returns 0 in case of an error.
*/
public static int getRemotePort() {
Call call = CurCall.get();
return (call != null) ? call.getRemotePort() : 0;
} }
/** /**
@ -973,6 +982,9 @@ public UserGroupInformation getRemoteUser() {
public InetAddress getHostInetAddress() { public InetAddress getHostInetAddress() {
return null; return null;
} }
public int getRemotePort() {
return 0;
}
public String getHostAddress() { public String getHostAddress() {
InetAddress addr = getHostInetAddress(); InetAddress addr = getHostInetAddress();
return (addr != null) ? addr.getHostAddress() : null; return (addr != null) ? addr.getHostAddress() : null;
@ -1130,6 +1142,11 @@ public InetAddress getHostInetAddress() {
return connection.getHostInetAddress(); return connection.getHostInetAddress();
} }
@Override
public int getRemotePort() {
return connection.getRemotePort();
}
@Override @Override
public Void run() throws Exception { public Void run() throws Exception {
if (!connection.channel.isOpen()) { if (!connection.channel.isOpen()) {
@ -2011,6 +2028,10 @@ public int getIngressPort() {
return ingressPort; return ingressPort;
} }
public int getRemotePort() {
return remotePort;
}
public InetAddress getHostInetAddress() { public InetAddress getHostInetAddress() {
return addr; return addr;
} }

View File

@ -727,6 +727,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
"dfs.datanode.metrics.logger.period.seconds"; "dfs.datanode.metrics.logger.period.seconds";
public static final int DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_DEFAULT = public static final int DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_DEFAULT =
600; 600;
public static final String DFS_NAMENODE_AUDIT_LOG_WITH_REMOTE_PORT_KEY =
"dfs.namenode.audit.log.with.remote.port";
public static final boolean DFS_NAMENODE_AUDIT_LOG_WITH_REMOTE_PORT_DEFAULT =
false;
/** /**
* The maximum number of getBlocks RPCs data movement utilities can make to * The maximum number of getBlocks RPCs data movement utilities can make to
* a NameNode per second. Values &lt;= 0 disable throttling. This affects * a NameNode per second. Values &lt;= 0 disable throttling. This affects

View File

@ -198,6 +198,9 @@ public static String getRemoteAddr(HttpServletRequest request) {
return remoteAddr; return remoteAddr;
} }
public static int getRemotePort(HttpServletRequest request) {
return request.getRemotePort();
}
/** /**
* Expected user name should be a short name. * Expected user name should be a short name.

View File

@ -17,14 +17,13 @@
*/ */
package org.apache.hadoop.hdfs.server.namenode; package org.apache.hadoop.hdfs.server.namenode;
import java.net.InetAddress;
import java.security.Principal;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import java.net.InetAddress;
/** /**
* Interface defining an audit logger. * Interface defining an audit logger.
*/ */

View File

@ -24,12 +24,16 @@
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_ENABLED_KEY; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_ENABLED_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_MAX_SIZE_DEFAULT; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_MAX_SIZE_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_MAX_SIZE_KEY; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_MAX_SIZE_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_SEPARATOR_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_SEPARATOR_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_SIGNATURE_MAX_SIZE_DEFAULT; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_SIGNATURE_MAX_SIZE_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_SIGNATURE_MAX_SIZE_KEY; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_SIGNATURE_MAX_SIZE_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_WITH_REMOTE_PORT_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_WITH_REMOTE_PORT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_PERMISSIONS_SUPERUSER_ONLY_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_PERMISSIONS_SUPERUSER_ONLY_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_PERMISSIONS_SUPERUSER_ONLY_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_PERMISSIONS_SUPERUSER_ONLY_KEY;
@ -397,6 +401,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
@Metric final MutableRatesWithAggregation detailedLockHoldTimeMetrics = @Metric final MutableRatesWithAggregation detailedLockHoldTimeMetrics =
registry.newRatesWithAggregation("detailedLockHoldTimeMetrics"); registry.newRatesWithAggregation("detailedLockHoldTimeMetrics");
private static final String CLIENT_PORT_STR = "clientPort";
private final String contextFieldSeparator;
boolean isAuditEnabled() { boolean isAuditEnabled() {
return (!isDefaultAuditLogger || auditLog.isInfoEnabled()) return (!isDefaultAuditLogger || auditLog.isInfoEnabled())
&& !auditLoggers.isEmpty(); && !auditLoggers.isEmpty();
@ -442,6 +449,9 @@ private void logAuditEvent(boolean succeeded,
for (AuditLogger logger : auditLoggers) { for (AuditLogger logger : auditLoggers) {
if (logger instanceof HdfsAuditLogger) { if (logger instanceof HdfsAuditLogger) {
HdfsAuditLogger hdfsLogger = (HdfsAuditLogger) logger; HdfsAuditLogger hdfsLogger = (HdfsAuditLogger) logger;
if (auditLogWithRemotePort) {
appendClientPortToCallerContextIfAbsent();
}
hdfsLogger.logAuditEvent(succeeded, ugiStr, addr, cmd, src, dst, hdfsLogger.logAuditEvent(succeeded, ugiStr, addr, cmd, src, dst,
status, CallerContext.getCurrent(), ugi, dtSecretManager); status, CallerContext.getCurrent(), ugi, dtSecretManager);
} else { } else {
@ -450,6 +460,25 @@ private void logAuditEvent(boolean succeeded,
} }
} }
private void appendClientPortToCallerContextIfAbsent() {
final CallerContext ctx = CallerContext.getCurrent();
if (isClientPortInfoAbsent(CLIENT_PORT_STR + ":" + Server.getRemotePort(),
ctx)) {
String origContext = ctx == null ? null : ctx.getContext();
byte[] origSignature = ctx == null ? null : ctx.getSignature();
CallerContext.setCurrent(
new CallerContext.Builder(origContext, contextFieldSeparator)
.append(CLIENT_PORT_STR, String.valueOf(Server.getRemotePort()))
.setSignature(origSignature)
.build());
}
}
private boolean isClientPortInfoAbsent(String clientPortInfo, CallerContext ctx){
return ctx == null || ctx.getContext() == null
|| !ctx.getContext().contains(clientPortInfo);
}
/** /**
* Logger for audit events, noting successful FSNamesystem operations. Emits * Logger for audit events, noting successful FSNamesystem operations. Emits
* to FSNamesystem.audit at INFO. Each event causes a set of tab-separated * to FSNamesystem.audit at INFO. Each event causes a set of tab-separated
@ -501,6 +530,7 @@ private void logAuditEvent(boolean succeeded,
// underlying logger is disabled, and avoid some unnecessary work. // underlying logger is disabled, and avoid some unnecessary work.
private final boolean isDefaultAuditLogger; private final boolean isDefaultAuditLogger;
private final List<AuditLogger> auditLoggers; private final List<AuditLogger> auditLoggers;
private final boolean auditLogWithRemotePort;
/** The namespace tree. */ /** The namespace tree. */
FSDirectory dir; FSDirectory dir;
@ -833,6 +863,12 @@ static FSNamesystem loadFromDisk(Configuration conf) throws IOException {
LOG.info("Enabling async auditlog"); LOG.info("Enabling async auditlog");
enableAsyncAuditLog(conf); enableAsyncAuditLog(conf);
} }
auditLogWithRemotePort =
conf.getBoolean(DFS_NAMENODE_AUDIT_LOG_WITH_REMOTE_PORT_KEY,
DFS_NAMENODE_AUDIT_LOG_WITH_REMOTE_PORT_DEFAULT);
this.contextFieldSeparator =
conf.get(HADOOP_CALLER_CONTEXT_SEPARATOR_KEY,
HADOOP_CALLER_CONTEXT_SEPARATOR_DEFAULT);
fsLock = new FSNamesystemLock(conf, detailedLockHoldTimeMetrics); fsLock = new FSNamesystemLock(conf, detailedLockHoldTimeMetrics);
cond = fsLock.newWriteLockCondition(); cond = fsLock.newWriteLockCondition();
cpLock = new ReentrantLock(); cpLock = new ReentrantLock();

View File

@ -17,8 +17,6 @@
*/ */
package org.apache.hadoop.hdfs.server.namenode; package org.apache.hadoop.hdfs.server.namenode;
import java.net.InetAddress;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
@ -26,6 +24,8 @@
import org.apache.hadoop.ipc.CallerContext; import org.apache.hadoop.ipc.CallerContext;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import java.net.InetAddress;
/** /**
* Extension of {@link AuditLogger}. * Extension of {@link AuditLogger}.
*/ */

View File

@ -133,6 +133,7 @@ public class NamenodeWebHdfsMethods {
private String scheme; private String scheme;
private Principal userPrincipal; private Principal userPrincipal;
private String remoteAddr; private String remoteAddr;
private int remotePort;
private @Context ServletContext context; private @Context ServletContext context;
private @Context HttpServletResponse response; private @Context HttpServletResponse response;
@ -147,6 +148,7 @@ public NamenodeWebHdfsMethods(@Context HttpServletRequest request) {
// get the remote address, if coming in via a trusted proxy server then // get the remote address, if coming in via a trusted proxy server then
// the address with be that of the proxied client // the address with be that of the proxied client
remoteAddr = JspHelper.getRemoteAddr(request); remoteAddr = JspHelper.getRemoteAddr(request);
remotePort = JspHelper.getRemotePort(request);
supportEZ = supportEZ =
Boolean.valueOf(request.getHeader(WebHdfsFileSystem.EZ_HEADER)); Boolean.valueOf(request.getHeader(WebHdfsFileSystem.EZ_HEADER));
} }
@ -225,6 +227,10 @@ public String getHostAddress() {
return getRemoteAddr(); return getRemoteAddr();
} }
@Override @Override
public int getRemotePort() {
return getRemotePortFromJSPHelper();
}
@Override
public InetAddress getHostInetAddress() { public InetAddress getHostInetAddress() {
try { try {
return InetAddress.getByName(getHostAddress()); return InetAddress.getByName(getHostAddress());
@ -255,6 +261,10 @@ protected String getRemoteAddr() {
return remoteAddr; return remoteAddr;
} }
protected int getRemotePortFromJSPHelper() {
return remotePort;
}
protected void queueExternalCall(ExternalCall call) protected void queueExternalCall(ExternalCall call)
throws IOException, InterruptedException { throws IOException, InterruptedException {
final NameNode namenode = (NameNode)context.getAttribute("name.node"); final NameNode namenode = (NameNode)context.getAttribute("name.node");

View File

@ -5056,6 +5056,14 @@
</description> </description>
</property> </property>
<property>
<name>dfs.namenode.audit.log.with.remote.port</name>
<value>false</value>
<description>
If true, adds a port of RPC call to callerContext for all audit log events.
</description>
</property>
<property> <property>
<name>dfs.namenode.available-space-block-placement-policy.balanced-space-preference-fraction</name> <name>dfs.namenode.available-space-block-placement-policy.balanced-space-preference-fraction</name>
<value>0.6</value> <value>0.6</value>

View File

@ -55,6 +55,7 @@
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.List; import java.util.List;
import java.util.regex.Pattern;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_ENABLED_KEY; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_ENABLED_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_MAX_SIZE_KEY; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_MAX_SIZE_KEY;
@ -69,6 +70,7 @@
import static org.apache.hadoop.fs.permission.FsAction.READ_EXECUTE; import static org.apache.hadoop.fs.permission.FsAction.READ_EXECUTE;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOGGERS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOGGERS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_WITH_REMOTE_PORT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.NNTOP_ENABLED_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.NNTOP_ENABLED_KEY;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
@ -89,6 +91,20 @@ public class TestAuditLogger {
} }
private static final short TEST_PERMISSION = (short) 0654; private static final short TEST_PERMISSION = (short) 0654;
private static final Pattern AUDIT_PATTERN = Pattern.compile(
".*allowed=.*?\\s" +
"ugi=.*?\\s" +
"ip=/\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\s" +
"cmd=.*?\\ssrc=.*?\\sdst=null\\s" +
"perm=.*?");
private static final Pattern AUDIT_WITH_PORT_PATTERN = Pattern.compile(
".*allowed=.*?\\s" +
"ugi=.*?\\s" +
"ip=/\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\s" +
"cmd=.*?\\ssrc=.*?\\sdst=null\\s" +
"perm=.*?" +
"proto=.*?" +
"callerContext=.*?clientPort\\:(\\d{0,9}).*?");
@Before @Before
public void setup() { public void setup() {
@ -544,6 +560,45 @@ public void testBrokenLogger() throws IOException {
} }
} }
/**
* Test adding remote port to audit log.
*/
@Test
public void testAuditLogWithRemotePort() throws Exception {
// Audit log without remote port by default.
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster1 = new MiniDFSCluster.Builder(conf).build();
try {
LogCapturer auditLog = LogCapturer.captureLogs(FSNamesystem.auditLog);
cluster1.waitClusterUp();
FileSystem fs = cluster1.getFileSystem();
long time = System.currentTimeMillis();
fs.setTimes(new Path("/"), time, time);
assertTrue(AUDIT_PATTERN.matcher(auditLog.getOutput().trim()).matches());
assertFalse(auditLog.getOutput().contains("clientPort"));
auditLog.clearOutput();
} finally {
cluster1.shutdown();
}
// Audit log with remote port.
conf.setBoolean(DFS_NAMENODE_AUDIT_LOG_WITH_REMOTE_PORT_KEY, true);
conf.setBoolean(HADOOP_CALLER_CONTEXT_ENABLED_KEY, true);
MiniDFSCluster cluster2 = new MiniDFSCluster.Builder(conf).build();
try {
LogCapturer auditLog = LogCapturer.captureLogs(FSNamesystem.auditLog);
cluster2.waitClusterUp();
FileSystem fs = cluster2.getFileSystem();
long time = System.currentTimeMillis();
fs.setTimes(new Path("/"), time, time);
assertTrue(AUDIT_WITH_PORT_PATTERN.matcher(
auditLog.getOutput().trim()).matches());
auditLog.clearOutput();
} finally {
cluster2.shutdown();
}
}
public static class DummyAuditLogger implements AuditLogger { public static class DummyAuditLogger implements AuditLogger {
static boolean initialized; static boolean initialized;