From 8e129e5b8d99d9efac4d2f8cc2e319155d062cfe Mon Sep 17 00:00:00 2001 From: Hui Fei Date: Fri, 9 Oct 2020 16:12:22 +0800 Subject: [PATCH] HDFS-13293. RBF: The RouterRPCServer should transfer client IP via CallerContext to NamenodeRpcServer (#2363) Cherry-picked from 518a212c by Owen O'Malley --- .../org/apache/hadoop/ipc/CallerContext.java | 10 ++++++- .../federation/router/RouterRpcClient.java | 25 +++++++++++++++++ .../federation/router/TestRouterRpc.java | 28 ++++++++++++++++++- 3 files changed, 61 insertions(+), 2 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallerContext.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallerContext.java index e15340c618..378b83d13b 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallerContext.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallerContext.java @@ -129,7 +129,7 @@ public static final class Builder { private byte[] signature; public Builder(String context) { - this(context, new Configuration()); + this(context, HADOOP_CALLER_CONTEXT_SEPARATOR_DEFAULT); } public Builder(String context, Configuration conf) { @@ -141,6 +141,14 @@ public Builder(String context, Configuration conf) { checkFieldSeparator(fieldSeparator); } + public Builder(String context, String separator) { + if (isValid(context)) { + sb.append(context); + } + fieldSeparator = separator; + checkFieldSeparator(fieldSeparator); + } + /** * Check whether the separator is legal. * The illegal separators include '\t', '\n', '='. diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java index aeb6461686..cf73638a20 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hdfs.server.federation.router; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_SEPARATOR_DEFAULT; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_SEPARATOR_KEY; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_TIMEOUT_KEY; @@ -66,8 +68,10 @@ import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.io.retry.RetryPolicy.RetryAction.RetryDecision; +import org.apache.hadoop.ipc.CallerContext; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RetriableException; +import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.net.ConnectTimeoutException; import org.apache.hadoop.security.UserGroupInformation; @@ -115,11 +119,14 @@ public class RouterRpcClient { private final RetryPolicy retryPolicy; /** Optional perf monitor. */ private final RouterRpcMonitor rpcMonitor; + /** Field separator of CallerContext. */ + private final String contextFieldSeparator; /** Pattern to parse a stack trace line. */ private static final Pattern STACK_TRACE_PATTERN = Pattern.compile("\\tat (.*)\\.(.*)\\((.*):(\\d*)\\)"); + private static final String CLIENT_IP_STR = "clientIp"; /** * Create a router RPC client to manage remote procedure calls to NNs. @@ -136,6 +143,9 @@ public RouterRpcClient(Configuration conf, Router router, this.namenodeResolver = resolver; Configuration clientConf = getClientConfiguration(conf); + this.contextFieldSeparator = + clientConf.get(HADOOP_CALLER_CONTEXT_SEPARATOR_KEY, + HADOOP_CALLER_CONTEXT_SEPARATOR_DEFAULT); this.connectionManager = new ConnectionManager(clientConf); this.connectionManager.start(); @@ -404,6 +414,7 @@ private Object invokeMethod( " with params " + Arrays.deepToString(params) + " from " + router.getRouterId()); } + appendClientIpToCallerContext(); Object ret = null; if (rpcMonitor != null) { @@ -519,6 +530,20 @@ private Object invokeMethod( } } + /** + * For Tracking which is the actual client address. + * It adds key/value (clientIp/"ip") pair to the caller context. + */ + private void appendClientIpToCallerContext() { + final CallerContext ctx = CallerContext.getCurrent(); + String origContext = ctx == null ? null : ctx.getContext(); + byte[] origSignature = ctx == null ? null : ctx.getSignature(); + CallerContext.setCurrent( + new CallerContext.Builder(origContext, contextFieldSeparator) + .append(CLIENT_IP_STR, Server.getRemoteAddress()) + .setSignature(origSignature).build()); + } + /** * Invokes a method on the designated object. Catches exceptions specific to * the invocation. diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java index 6c755ab3bb..ceb291dc16 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java @@ -118,6 +118,7 @@ import org.apache.hadoop.io.EnumSetWritable; import org.apache.hadoop.io.erasurecode.ECSchema; import org.apache.hadoop.io.erasurecode.ErasureCodeConstants; +import org.apache.hadoop.ipc.CallerContext; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.Service.STATE; import org.apache.hadoop.test.GenericTestUtils; @@ -196,6 +197,8 @@ public int compare( @BeforeClass public static void globalSetUp() throws Exception { Configuration namenodeConf = new Configuration(); + namenodeConf.setBoolean(DFSConfigKeys.HADOOP_CALLER_CONTEXT_ENABLED_KEY, + true); // It's very easy to become overloaded for some specific dn in this small // cluster, which will cause the EC file block allocation failure. To avoid // this issue, we disable considerLoad option. @@ -1841,4 +1844,27 @@ private DFSClient getFileDFSClient(final String path) { } return null; } -} \ No newline at end of file + + @Test + public void testMkdirsWithCallerContext() throws IOException { + GenericTestUtils.LogCapturer auditlog = + GenericTestUtils.LogCapturer.captureLogs(FSNamesystem.auditLog); + + // Current callerContext is null + assertNull(CallerContext.getCurrent()); + + // Set client context + CallerContext.setCurrent( + new CallerContext.Builder("clientContext").build()); + + // Create a directory via the router + String dirPath = "/test_dir_with_callercontext"; + FsPermission permission = new FsPermission("755"); + routerProtocol.mkdirs(dirPath, permission, false); + + // The audit log should contains "callerContext=clientContext,clientIp:" + assertTrue(auditlog.getOutput() + .contains("callerContext=clientContext,clientIp:")); + assertTrue(verifyFileExists(routerFS, dirPath)); + } +}