HDFS-16266. Add remote port information to HDFS audit log (#3538)
Reviewed-by: Akira Ajisaka <aajisaka@apache.org>
Reviewed-by: Wei-Chiu Chuang <weichiu@apache.org>
Signed-off-by: Takanobu Asanuma <tasanuma@apache.org>
Cherry-picked from 359b03c8
by Owen O'Malley
This commit is contained in:
parent
2479d4ab6c
commit
f9d40ed7b7
@ -369,11 +369,20 @@ public static int getCallRetryCount() {
|
||||
}
|
||||
|
||||
/** Returns the remote side ip address when invoked inside an RPC
|
||||
* Returns null incase of an error.
|
||||
* Returns null in case of an error.
|
||||
*/
|
||||
public static InetAddress getRemoteIp() {
|
||||
Call call = CurCall.get();
|
||||
return (call != null ) ? call.getHostInetAddress() : null;
|
||||
return (call != null) ? call.getHostInetAddress() : null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the remote side port when invoked inside an RPC
|
||||
* Returns 0 in case of an error.
|
||||
*/
|
||||
public static int getRemotePort() {
|
||||
Call call = CurCall.get();
|
||||
return (call != null) ? call.getRemotePort() : 0;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -409,7 +418,7 @@ public static byte[] getClientId() {
|
||||
Call call = CurCall.get();
|
||||
return call != null ? call.clientId : RpcConstants.DUMMY_CLIENT_ID;
|
||||
}
|
||||
|
||||
|
||||
/** Returns remote address as a string when invoked inside an RPC.
|
||||
* Returns null in case of an error.
|
||||
*/
|
||||
@ -447,7 +456,7 @@ public static int getPriorityLevel() {
|
||||
return call != null? call.getPriorityLevel() : 0;
|
||||
}
|
||||
|
||||
private String bindAddress;
|
||||
private String bindAddress;
|
||||
private int port; // port we listen on
|
||||
private int handlerCount; // number of handler threads
|
||||
private int readThreads; // number of read threads
|
||||
@ -455,7 +464,7 @@ public static int getPriorityLevel() {
|
||||
private Class<? extends Writable> rpcRequestClass; // class used for deserializing the rpc request
|
||||
final protected RpcMetrics rpcMetrics;
|
||||
final protected RpcDetailedMetrics rpcDetailedMetrics;
|
||||
|
||||
|
||||
private Configuration conf;
|
||||
private String portRangeConfig = null;
|
||||
private SecretManager<TokenIdentifier> secretManager;
|
||||
@ -871,6 +880,9 @@ public UserGroupInformation getRemoteUser() {
|
||||
public InetAddress getHostInetAddress() {
|
||||
return null;
|
||||
}
|
||||
public int getRemotePort() {
|
||||
return 0;
|
||||
}
|
||||
public String getHostAddress() {
|
||||
InetAddress addr = getHostInetAddress();
|
||||
return (addr != null) ? addr.getHostAddress() : null;
|
||||
@ -1028,6 +1040,11 @@ public InetAddress getHostInetAddress() {
|
||||
return connection.getHostInetAddress();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getRemotePort() {
|
||||
return connection.getRemotePort();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Void run() throws Exception {
|
||||
if (!connection.channel.isOpen()) {
|
||||
@ -1909,6 +1926,10 @@ public int getIngressPort() {
|
||||
return ingressPort;
|
||||
}
|
||||
|
||||
public int getRemotePort() {
|
||||
return remotePort;
|
||||
}
|
||||
|
||||
public InetAddress getHostInetAddress() {
|
||||
return addr;
|
||||
}
|
||||
|
@ -707,6 +707,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||
"dfs.datanode.metrics.logger.period.seconds";
|
||||
public static final int DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_DEFAULT =
|
||||
600;
|
||||
public static final String DFS_NAMENODE_AUDIT_LOG_WITH_REMOTE_PORT_KEY =
|
||||
"dfs.namenode.audit.log.with.remote.port";
|
||||
public static final boolean DFS_NAMENODE_AUDIT_LOG_WITH_REMOTE_PORT_DEFAULT =
|
||||
false;
|
||||
/**
|
||||
* The maximum number of getBlocks RPCs data movement utilities can make to
|
||||
* a NameNode per second. Values <= 0 disable throttling. This affects
|
||||
|
@ -198,6 +198,9 @@ public static String getRemoteAddr(HttpServletRequest request) {
|
||||
return remoteAddr;
|
||||
}
|
||||
|
||||
public static int getRemotePort(HttpServletRequest request) {
|
||||
return request.getRemotePort();
|
||||
}
|
||||
|
||||
/**
|
||||
* Expected user name should be a short name.
|
||||
|
@ -17,14 +17,13 @@
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.namenode;
|
||||
|
||||
import java.net.InetAddress;
|
||||
import java.security.Principal;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
|
||||
import java.net.InetAddress;
|
||||
|
||||
/**
|
||||
* Interface defining an audit logger.
|
||||
*/
|
||||
|
@ -24,12 +24,16 @@
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_ENABLED_KEY;
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_MAX_SIZE_DEFAULT;
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_MAX_SIZE_KEY;
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_SEPARATOR_DEFAULT;
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_SEPARATOR_KEY;
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_SIGNATURE_MAX_SIZE_DEFAULT;
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_SIGNATURE_MAX_SIZE_KEY;
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_WITH_REMOTE_PORT_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_WITH_REMOTE_PORT_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_PERMISSIONS_SUPERUSER_ONLY_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_PERMISSIONS_SUPERUSER_ONLY_KEY;
|
||||
@ -383,6 +387,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||
@Metric final MutableRatesWithAggregation detailedLockHoldTimeMetrics =
|
||||
registry.newRatesWithAggregation("detailedLockHoldTimeMetrics");
|
||||
|
||||
private static final String CLIENT_PORT_STR = "clientPort";
|
||||
private final String contextFieldSeparator;
|
||||
|
||||
boolean isAuditEnabled() {
|
||||
return (!isDefaultAuditLogger || auditLog.isInfoEnabled())
|
||||
&& !auditLoggers.isEmpty();
|
||||
@ -397,7 +404,7 @@ private void logAuditEvent(boolean succeeded, String cmd, String src,
|
||||
String dst, FileStatus stat) throws IOException {
|
||||
if (isAuditEnabled() && isExternalInvocation()) {
|
||||
logAuditEvent(succeeded, Server.getRemoteUser(), Server.getRemoteIp(),
|
||||
cmd, src, dst, stat);
|
||||
cmd, src, dst, stat);
|
||||
}
|
||||
}
|
||||
|
||||
@ -428,6 +435,9 @@ private void logAuditEvent(boolean succeeded,
|
||||
for (AuditLogger logger : auditLoggers) {
|
||||
if (logger instanceof HdfsAuditLogger) {
|
||||
HdfsAuditLogger hdfsLogger = (HdfsAuditLogger) logger;
|
||||
if (auditLogWithRemotePort) {
|
||||
appendClientPortToCallerContextIfAbsent();
|
||||
}
|
||||
hdfsLogger.logAuditEvent(succeeded, ugiStr, addr, cmd, src, dst,
|
||||
status, CallerContext.getCurrent(), ugi, dtSecretManager);
|
||||
} else {
|
||||
@ -436,6 +446,25 @@ private void logAuditEvent(boolean succeeded,
|
||||
}
|
||||
}
|
||||
|
||||
private void appendClientPortToCallerContextIfAbsent() {
|
||||
final CallerContext ctx = CallerContext.getCurrent();
|
||||
if (isClientPortInfoAbsent(CLIENT_PORT_STR + ":" + Server.getRemotePort(),
|
||||
ctx)) {
|
||||
String origContext = ctx == null ? null : ctx.getContext();
|
||||
byte[] origSignature = ctx == null ? null : ctx.getSignature();
|
||||
CallerContext.setCurrent(
|
||||
new CallerContext.Builder(origContext, contextFieldSeparator)
|
||||
.append(CLIENT_PORT_STR, String.valueOf(Server.getRemotePort()))
|
||||
.setSignature(origSignature)
|
||||
.build());
|
||||
}
|
||||
}
|
||||
|
||||
private boolean isClientPortInfoAbsent(String clientPortInfo, CallerContext ctx){
|
||||
return ctx == null || ctx.getContext() == null
|
||||
|| !ctx.getContext().contains(clientPortInfo);
|
||||
}
|
||||
|
||||
/**
|
||||
* Logger for audit events, noting successful FSNamesystem operations. Emits
|
||||
* to FSNamesystem.audit at INFO. Each event causes a set of tab-separated
|
||||
@ -486,6 +515,7 @@ private void logAuditEvent(boolean succeeded,
|
||||
// underlying logger is disabled, and avoid some unnecessary work.
|
||||
private final boolean isDefaultAuditLogger;
|
||||
private final List<AuditLogger> auditLoggers;
|
||||
private final boolean auditLogWithRemotePort;
|
||||
|
||||
/** The namespace tree. */
|
||||
FSDirectory dir;
|
||||
@ -817,6 +847,12 @@ static FSNamesystem loadFromDisk(Configuration conf) throws IOException {
|
||||
LOG.info("Enabling async auditlog");
|
||||
enableAsyncAuditLog(conf);
|
||||
}
|
||||
auditLogWithRemotePort =
|
||||
conf.getBoolean(DFS_NAMENODE_AUDIT_LOG_WITH_REMOTE_PORT_KEY,
|
||||
DFS_NAMENODE_AUDIT_LOG_WITH_REMOTE_PORT_DEFAULT);
|
||||
this.contextFieldSeparator =
|
||||
conf.get(HADOOP_CALLER_CONTEXT_SEPARATOR_KEY,
|
||||
HADOOP_CALLER_CONTEXT_SEPARATOR_DEFAULT);
|
||||
fsLock = new FSNamesystemLock(conf, detailedLockHoldTimeMetrics);
|
||||
cond = fsLock.newWriteLockCondition();
|
||||
cpLock = new ReentrantLock();
|
||||
|
@ -48,9 +48,9 @@ public void doGet(HttpServletRequest request, HttpServletResponse response
|
||||
@SuppressWarnings("unchecked")
|
||||
final Map<String,String[]> pmap = request.getParameterMap();
|
||||
final PrintWriter out = response.getWriter();
|
||||
final InetAddress remoteAddress =
|
||||
final InetAddress remoteAddress =
|
||||
InetAddress.getByName(request.getRemoteAddr());
|
||||
final ServletContext context = getServletContext();
|
||||
final ServletContext context = getServletContext();
|
||||
final Configuration conf = NameNodeHttpServer.getConfFromContext(context);
|
||||
|
||||
final UserGroupInformation ugi = getUGI(request, conf);
|
||||
|
@ -17,8 +17,6 @@
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.namenode;
|
||||
|
||||
import java.net.InetAddress;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
@ -26,6 +24,8 @@
|
||||
import org.apache.hadoop.ipc.CallerContext;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
|
||||
import java.net.InetAddress;
|
||||
|
||||
/**
|
||||
* Extension of {@link AuditLogger}.
|
||||
*/
|
||||
|
@ -131,6 +131,7 @@ public class NamenodeWebHdfsMethods {
|
||||
private String scheme;
|
||||
private Principal userPrincipal;
|
||||
private String remoteAddr;
|
||||
private int remotePort;
|
||||
|
||||
private @Context ServletContext context;
|
||||
private @Context HttpServletResponse response;
|
||||
@ -145,6 +146,7 @@ public NamenodeWebHdfsMethods(@Context HttpServletRequest request) {
|
||||
// get the remote address, if coming in via a trusted proxy server then
|
||||
// the address with be that of the proxied client
|
||||
remoteAddr = JspHelper.getRemoteAddr(request);
|
||||
remotePort = JspHelper.getRemotePort(request);
|
||||
supportEZ =
|
||||
Boolean.valueOf(request.getHeader(WebHdfsFileSystem.EZ_HEADER));
|
||||
}
|
||||
@ -223,6 +225,10 @@ public String getHostAddress() {
|
||||
return getRemoteAddr();
|
||||
}
|
||||
@Override
|
||||
public int getRemotePort() {
|
||||
return getRemotePortFromJSPHelper();
|
||||
}
|
||||
@Override
|
||||
public InetAddress getHostInetAddress() {
|
||||
try {
|
||||
return InetAddress.getByName(getHostAddress());
|
||||
@ -253,6 +259,10 @@ protected String getRemoteAddr() {
|
||||
return remoteAddr;
|
||||
}
|
||||
|
||||
protected int getRemotePortFromJSPHelper() {
|
||||
return remotePort;
|
||||
}
|
||||
|
||||
protected void queueExternalCall(ExternalCall call)
|
||||
throws IOException, InterruptedException {
|
||||
final NameNode namenode = (NameNode)context.getAttribute("name.node");
|
||||
|
@ -5013,6 +5013,14 @@
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.namenode.audit.log.with.remote.port</name>
|
||||
<value>false</value>
|
||||
<description>
|
||||
If true, adds a port of RPC call to callerContext for all audit log events.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.namenode.available-space-block-placement-policy.balanced-space-preference-fraction</name>
|
||||
<value>0.6</value>
|
||||
|
@ -55,6 +55,7 @@
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.List;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_ENABLED_KEY;
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_MAX_SIZE_KEY;
|
||||
@ -69,6 +70,7 @@
|
||||
import static org.apache.hadoop.fs.permission.FsAction.READ_EXECUTE;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOGGERS_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_WITH_REMOTE_PORT_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.NNTOP_ENABLED_KEY;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
@ -89,6 +91,20 @@ public class TestAuditLogger {
|
||||
}
|
||||
|
||||
private static final short TEST_PERMISSION = (short) 0654;
|
||||
private static final Pattern AUDIT_PATTERN = Pattern.compile(
|
||||
".*allowed=.*?\\s" +
|
||||
"ugi=.*?\\s" +
|
||||
"ip=/\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\s" +
|
||||
"cmd=.*?\\ssrc=.*?\\sdst=null\\s" +
|
||||
"perm=.*?");
|
||||
private static final Pattern AUDIT_WITH_PORT_PATTERN = Pattern.compile(
|
||||
".*allowed=.*?\\s" +
|
||||
"ugi=.*?\\s" +
|
||||
"ip=/\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\s" +
|
||||
"cmd=.*?\\ssrc=.*?\\sdst=null\\s" +
|
||||
"perm=.*?" +
|
||||
"proto=.*?" +
|
||||
"callerContext=.*?clientPort\\:(\\d{0,9}).*?");
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
@ -544,6 +560,45 @@ public void testBrokenLogger() throws IOException {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test adding remote port to audit log.
|
||||
*/
|
||||
@Test
|
||||
public void testAuditLogWithRemotePort() throws Exception {
|
||||
// Audit log without remote port by default.
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
MiniDFSCluster cluster1 = new MiniDFSCluster.Builder(conf).build();
|
||||
try {
|
||||
LogCapturer auditLog = LogCapturer.captureLogs(FSNamesystem.auditLog);
|
||||
cluster1.waitClusterUp();
|
||||
FileSystem fs = cluster1.getFileSystem();
|
||||
long time = System.currentTimeMillis();
|
||||
fs.setTimes(new Path("/"), time, time);
|
||||
assertTrue(AUDIT_PATTERN.matcher(auditLog.getOutput().trim()).matches());
|
||||
assertFalse(auditLog.getOutput().contains("clientPort"));
|
||||
auditLog.clearOutput();
|
||||
} finally {
|
||||
cluster1.shutdown();
|
||||
}
|
||||
|
||||
// Audit log with remote port.
|
||||
conf.setBoolean(DFS_NAMENODE_AUDIT_LOG_WITH_REMOTE_PORT_KEY, true);
|
||||
conf.setBoolean(HADOOP_CALLER_CONTEXT_ENABLED_KEY, true);
|
||||
MiniDFSCluster cluster2 = new MiniDFSCluster.Builder(conf).build();
|
||||
try {
|
||||
LogCapturer auditLog = LogCapturer.captureLogs(FSNamesystem.auditLog);
|
||||
cluster2.waitClusterUp();
|
||||
FileSystem fs = cluster2.getFileSystem();
|
||||
long time = System.currentTimeMillis();
|
||||
fs.setTimes(new Path("/"), time, time);
|
||||
assertTrue(AUDIT_WITH_PORT_PATTERN.matcher(
|
||||
auditLog.getOutput().trim()).matches());
|
||||
auditLog.clearOutput();
|
||||
} finally {
|
||||
cluster2.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
public static class DummyAuditLogger implements AuditLogger {
|
||||
|
||||
static boolean initialized;
|
||||
|
@ -90,9 +90,9 @@ public TestAuditLogs(boolean useAsyncLog, boolean useAsyncEdits) {
|
||||
// allowed=(true|false) ugi=name ip=/address cmd={cmd} src={path} dst=null perm=null
|
||||
static final Pattern auditPattern = Pattern.compile(
|
||||
"allowed=.*?\\s" +
|
||||
"ugi=.*?\\s" +
|
||||
"ip=/\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\s" +
|
||||
"cmd=.*?\\ssrc=.*?\\sdst=null\\s" +
|
||||
"ugi=.*?\\s" +
|
||||
"ip=/\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\s" +
|
||||
"cmd=.*?\\ssrc=.*?\\sdst=null\\s" +
|
||||
"perm=.*?");
|
||||
static final Pattern successPattern = Pattern.compile(
|
||||
".*allowed=true.*");
|
||||
|
@ -145,14 +145,14 @@ public class TestFsck {
|
||||
// allowed=true ugi=name ip=/address cmd=FSCK src=/ dst=null perm=null
|
||||
static final Pattern FSCK_PATTERN = Pattern.compile(
|
||||
"allowed=.*?\\s" +
|
||||
"ugi=.*?\\s" +
|
||||
"ip=/\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\s" +
|
||||
"ugi=.*?\\s" +
|
||||
"ip=/\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\s" +
|
||||
"cmd=fsck\\ssrc=\\/\\sdst=null\\s" +
|
||||
"perm=null\\s" + "proto=.*");
|
||||
static final Pattern GET_FILE_INFO_PATTERN = Pattern.compile(
|
||||
"allowed=.*?\\s" +
|
||||
"ugi=.*?\\s" +
|
||||
"ip=/\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\s" +
|
||||
"ugi=.*?\\s" +
|
||||
"ip=/\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\s" +
|
||||
"cmd=getfileinfo\\ssrc=\\/\\sdst=null\\s" +
|
||||
"perm=null\\s" + "proto=.*");
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user