HDFS-16266. Add remote port information to HDFS audit log (#3538)

Reviewed-by: Akira Ajisaka <aajisaka@apache.org>
Reviewed-by: Wei-Chiu Chuang <weichiu@apache.org>
Signed-off-by: Takanobu Asanuma <tasanuma@apache.org>
This commit is contained in:
litao 2021-11-04 09:16:03 +08:00 committed by GitHub
parent a21895a5b3
commit 359b03c83e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 156 additions and 20 deletions

View File

@ -369,11 +369,20 @@ public static int getCallRetryCount() {
}
/** Returns the remote side ip address when invoked inside an RPC
* Returns null incase of an error.
* Returns null in case of an error.
*/
public static InetAddress getRemoteIp() {
Call call = CurCall.get();
return (call != null ) ? call.getHostInetAddress() : null;
return (call != null) ? call.getHostInetAddress() : null;
}
/**
* Returns the remote side port when invoked inside an RPC
* Returns 0 in case of an error.
*/
public static int getRemotePort() {
Call call = CurCall.get();
return (call != null) ? call.getRemotePort() : 0;
}
/**
@ -409,7 +418,7 @@ public static byte[] getClientId() {
Call call = CurCall.get();
return call != null ? call.clientId : RpcConstants.DUMMY_CLIENT_ID;
}
/** Returns remote address as a string when invoked inside an RPC.
* Returns null in case of an error.
*/
@ -447,7 +456,7 @@ public static int getPriorityLevel() {
return call != null? call.getPriorityLevel() : 0;
}
private String bindAddress;
private String bindAddress;
private int port; // port we listen on
private int handlerCount; // number of handler threads
private int readThreads; // number of read threads
@ -455,7 +464,7 @@ public static int getPriorityLevel() {
private Class<? extends Writable> rpcRequestClass; // class used for deserializing the rpc request
final protected RpcMetrics rpcMetrics;
final protected RpcDetailedMetrics rpcDetailedMetrics;
private Configuration conf;
private String portRangeConfig = null;
private SecretManager<TokenIdentifier> secretManager;
@ -973,6 +982,9 @@ public UserGroupInformation getRemoteUser() {
public InetAddress getHostInetAddress() {
return null;
}
public int getRemotePort() {
return 0;
}
public String getHostAddress() {
InetAddress addr = getHostInetAddress();
return (addr != null) ? addr.getHostAddress() : null;
@ -1130,6 +1142,11 @@ public InetAddress getHostInetAddress() {
return connection.getHostInetAddress();
}
@Override
public int getRemotePort() {
return connection.getRemotePort();
}
@Override
public Void run() throws Exception {
if (!connection.channel.isOpen()) {
@ -2011,6 +2028,10 @@ public int getIngressPort() {
return ingressPort;
}
public int getRemotePort() {
return remotePort;
}
public InetAddress getHostInetAddress() {
return addr;
}

View File

@ -727,6 +727,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
"dfs.datanode.metrics.logger.period.seconds";
public static final int DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_DEFAULT =
600;
public static final String DFS_NAMENODE_AUDIT_LOG_WITH_REMOTE_PORT_KEY =
"dfs.namenode.audit.log.with.remote.port";
public static final boolean DFS_NAMENODE_AUDIT_LOG_WITH_REMOTE_PORT_DEFAULT =
false;
/**
* The maximum number of getBlocks RPCs data movement utilities can make to
* a NameNode per second. Values &lt;= 0 disable throttling. This affects

View File

@ -198,6 +198,9 @@ public static String getRemoteAddr(HttpServletRequest request) {
return remoteAddr;
}
public static int getRemotePort(HttpServletRequest request) {
return request.getRemotePort();
}
/**
* Expected user name should be a short name.

View File

@ -17,14 +17,13 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
import java.net.InetAddress;
import java.security.Principal;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import java.net.InetAddress;
/**
* Interface defining an audit logger.
*/

View File

@ -24,12 +24,16 @@
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_ENABLED_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_MAX_SIZE_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_MAX_SIZE_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_SEPARATOR_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_SEPARATOR_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_SIGNATURE_MAX_SIZE_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_SIGNATURE_MAX_SIZE_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_WITH_REMOTE_PORT_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_WITH_REMOTE_PORT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_PERMISSIONS_SUPERUSER_ONLY_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_PERMISSIONS_SUPERUSER_ONLY_KEY;
@ -397,6 +401,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
@Metric final MutableRatesWithAggregation detailedLockHoldTimeMetrics =
registry.newRatesWithAggregation("detailedLockHoldTimeMetrics");
private static final String CLIENT_PORT_STR = "clientPort";
private final String contextFieldSeparator;
boolean isAuditEnabled() {
return (!isDefaultAuditLogger || auditLog.isInfoEnabled())
&& !auditLoggers.isEmpty();
@ -411,7 +418,7 @@ private void logAuditEvent(boolean succeeded, String cmd, String src,
String dst, FileStatus stat) throws IOException {
if (isAuditEnabled() && isExternalInvocation()) {
logAuditEvent(succeeded, Server.getRemoteUser(), Server.getRemoteIp(),
cmd, src, dst, stat);
cmd, src, dst, stat);
}
}
@ -442,6 +449,9 @@ private void logAuditEvent(boolean succeeded,
for (AuditLogger logger : auditLoggers) {
if (logger instanceof HdfsAuditLogger) {
HdfsAuditLogger hdfsLogger = (HdfsAuditLogger) logger;
if (auditLogWithRemotePort) {
appendClientPortToCallerContextIfAbsent();
}
hdfsLogger.logAuditEvent(succeeded, ugiStr, addr, cmd, src, dst,
status, CallerContext.getCurrent(), ugi, dtSecretManager);
} else {
@ -450,6 +460,25 @@ private void logAuditEvent(boolean succeeded,
}
}
private void appendClientPortToCallerContextIfAbsent() {
final CallerContext ctx = CallerContext.getCurrent();
if (isClientPortInfoAbsent(CLIENT_PORT_STR + ":" + Server.getRemotePort(),
ctx)) {
String origContext = ctx == null ? null : ctx.getContext();
byte[] origSignature = ctx == null ? null : ctx.getSignature();
CallerContext.setCurrent(
new CallerContext.Builder(origContext, contextFieldSeparator)
.append(CLIENT_PORT_STR, String.valueOf(Server.getRemotePort()))
.setSignature(origSignature)
.build());
}
}
private boolean isClientPortInfoAbsent(String clientPortInfo, CallerContext ctx){
return ctx == null || ctx.getContext() == null
|| !ctx.getContext().contains(clientPortInfo);
}
/**
* Logger for audit events, noting successful FSNamesystem operations. Emits
* to FSNamesystem.audit at INFO. Each event causes a set of tab-separated
@ -501,6 +530,7 @@ private void logAuditEvent(boolean succeeded,
// underlying logger is disabled, and avoid some unnecessary work.
private final boolean isDefaultAuditLogger;
private final List<AuditLogger> auditLoggers;
private final boolean auditLogWithRemotePort;
/** The namespace tree. */
FSDirectory dir;
@ -833,6 +863,12 @@ static FSNamesystem loadFromDisk(Configuration conf) throws IOException {
LOG.info("Enabling async auditlog");
enableAsyncAuditLog(conf);
}
auditLogWithRemotePort =
conf.getBoolean(DFS_NAMENODE_AUDIT_LOG_WITH_REMOTE_PORT_KEY,
DFS_NAMENODE_AUDIT_LOG_WITH_REMOTE_PORT_DEFAULT);
this.contextFieldSeparator =
conf.get(HADOOP_CALLER_CONTEXT_SEPARATOR_KEY,
HADOOP_CALLER_CONTEXT_SEPARATOR_DEFAULT);
fsLock = new FSNamesystemLock(conf, detailedLockHoldTimeMetrics);
cond = fsLock.newWriteLockCondition();
cpLock = new ReentrantLock();

View File

@ -48,9 +48,9 @@ public void doGet(HttpServletRequest request, HttpServletResponse response
@SuppressWarnings("unchecked")
final Map<String,String[]> pmap = request.getParameterMap();
final PrintWriter out = response.getWriter();
final InetAddress remoteAddress =
final InetAddress remoteAddress =
InetAddress.getByName(request.getRemoteAddr());
final ServletContext context = getServletContext();
final ServletContext context = getServletContext();
final Configuration conf = NameNodeHttpServer.getConfFromContext(context);
final UserGroupInformation ugi = getUGI(request, conf);

View File

@ -17,8 +17,6 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
import java.net.InetAddress;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileStatus;
@ -26,6 +24,8 @@
import org.apache.hadoop.ipc.CallerContext;
import org.apache.hadoop.security.UserGroupInformation;
import java.net.InetAddress;
/**
* Extension of {@link AuditLogger}.
*/

View File

@ -133,6 +133,7 @@ public class NamenodeWebHdfsMethods {
private String scheme;
private Principal userPrincipal;
private String remoteAddr;
private int remotePort;
private @Context ServletContext context;
private @Context HttpServletResponse response;
@ -147,6 +148,7 @@ public NamenodeWebHdfsMethods(@Context HttpServletRequest request) {
// get the remote address, if coming in via a trusted proxy server then
// the address with be that of the proxied client
remoteAddr = JspHelper.getRemoteAddr(request);
remotePort = JspHelper.getRemotePort(request);
supportEZ =
Boolean.valueOf(request.getHeader(WebHdfsFileSystem.EZ_HEADER));
}
@ -225,6 +227,10 @@ public String getHostAddress() {
return getRemoteAddr();
}
@Override
public int getRemotePort() {
return getRemotePortFromJSPHelper();
}
@Override
public InetAddress getHostInetAddress() {
try {
return InetAddress.getByName(getHostAddress());
@ -255,6 +261,10 @@ protected String getRemoteAddr() {
return remoteAddr;
}
protected int getRemotePortFromJSPHelper() {
return remotePort;
}
protected void queueExternalCall(ExternalCall call)
throws IOException, InterruptedException {
final NameNode namenode = (NameNode)context.getAttribute("name.node");

View File

@ -5056,6 +5056,14 @@
</description>
</property>
<property>
<name>dfs.namenode.audit.log.with.remote.port</name>
<value>false</value>
<description>
If true, adds a port of RPC call to callerContext for all audit log events.
</description>
</property>
<property>
<name>dfs.namenode.available-space-block-placement-policy.balanced-space-preference-fraction</name>
<value>0.6</value>

View File

@ -55,6 +55,7 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
import java.util.regex.Pattern;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_ENABLED_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_MAX_SIZE_KEY;
@ -69,6 +70,7 @@
import static org.apache.hadoop.fs.permission.FsAction.READ_EXECUTE;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOGGERS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_WITH_REMOTE_PORT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.NNTOP_ENABLED_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@ -89,6 +91,20 @@ public class TestAuditLogger {
}
private static final short TEST_PERMISSION = (short) 0654;
private static final Pattern AUDIT_PATTERN = Pattern.compile(
".*allowed=.*?\\s" +
"ugi=.*?\\s" +
"ip=/\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\s" +
"cmd=.*?\\ssrc=.*?\\sdst=null\\s" +
"perm=.*?");
private static final Pattern AUDIT_WITH_PORT_PATTERN = Pattern.compile(
".*allowed=.*?\\s" +
"ugi=.*?\\s" +
"ip=/\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\s" +
"cmd=.*?\\ssrc=.*?\\sdst=null\\s" +
"perm=.*?" +
"proto=.*?" +
"callerContext=.*?clientPort\\:(\\d{0,9}).*?");
@Before
public void setup() {
@ -544,6 +560,45 @@ public void testBrokenLogger() throws IOException {
}
}
/**
* Test adding remote port to audit log.
*/
@Test
public void testAuditLogWithRemotePort() throws Exception {
// Audit log without remote port by default.
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster1 = new MiniDFSCluster.Builder(conf).build();
try {
LogCapturer auditLog = LogCapturer.captureLogs(FSNamesystem.auditLog);
cluster1.waitClusterUp();
FileSystem fs = cluster1.getFileSystem();
long time = System.currentTimeMillis();
fs.setTimes(new Path("/"), time, time);
assertTrue(AUDIT_PATTERN.matcher(auditLog.getOutput().trim()).matches());
assertFalse(auditLog.getOutput().contains("clientPort"));
auditLog.clearOutput();
} finally {
cluster1.shutdown();
}
// Audit log with remote port.
conf.setBoolean(DFS_NAMENODE_AUDIT_LOG_WITH_REMOTE_PORT_KEY, true);
conf.setBoolean(HADOOP_CALLER_CONTEXT_ENABLED_KEY, true);
MiniDFSCluster cluster2 = new MiniDFSCluster.Builder(conf).build();
try {
LogCapturer auditLog = LogCapturer.captureLogs(FSNamesystem.auditLog);
cluster2.waitClusterUp();
FileSystem fs = cluster2.getFileSystem();
long time = System.currentTimeMillis();
fs.setTimes(new Path("/"), time, time);
assertTrue(AUDIT_WITH_PORT_PATTERN.matcher(
auditLog.getOutput().trim()).matches());
auditLog.clearOutput();
} finally {
cluster2.shutdown();
}
}
public static class DummyAuditLogger implements AuditLogger {
static boolean initialized;

View File

@ -92,9 +92,9 @@ public TestAuditLogs(boolean useAsyncLog, boolean useAsyncEdits) {
// allowed=(true|false) ugi=name ip=/address cmd={cmd} src={path} dst=null perm=null
static final Pattern auditPattern = Pattern.compile(
"allowed=.*?\\s" +
"ugi=.*?\\s" +
"ip=/\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\s" +
"cmd=.*?\\ssrc=.*?\\sdst=null\\s" +
"ugi=.*?\\s" +
"ip=/\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\s" +
"cmd=.*?\\ssrc=.*?\\sdst=null\\s" +
"perm=.*?");
static final Pattern successPattern = Pattern.compile(
".*allowed=true.*");

View File

@ -144,14 +144,14 @@ public class TestFsck {
// allowed=true ugi=name ip=/address cmd=FSCK src=/ dst=null perm=null
static final Pattern FSCK_PATTERN = Pattern.compile(
"allowed=.*?\\s" +
"ugi=.*?\\s" +
"ip=/\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\s" +
"ugi=.*?\\s" +
"ip=/\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\s" +
"cmd=fsck\\ssrc=\\/\\sdst=null\\s" +
"perm=null\\s" + "proto=.*");
static final Pattern GET_FILE_INFO_PATTERN = Pattern.compile(
"allowed=.*?\\s" +
"ugi=.*?\\s" +
"ip=/\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\s" +
"ugi=.*?\\s" +
"ip=/\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\s" +
"cmd=getfileinfo\\ssrc=\\/\\sdst=null\\s" +
"perm=null\\s" + "proto=.*");