HDFS-13293. RBF: The RouterRPCServer should transfer client IP via CallerContext to NamenodeRpcServer (#2363)
Cherry-picked from 518a212c
by Owen O'Malley
This commit is contained in:
parent
5a38ed2f22
commit
8e129e5b8d
@ -129,7 +129,7 @@ public static final class Builder {
|
|||||||
private byte[] signature;
|
private byte[] signature;
|
||||||
|
|
||||||
public Builder(String context) {
|
public Builder(String context) {
|
||||||
this(context, new Configuration());
|
this(context, HADOOP_CALLER_CONTEXT_SEPARATOR_DEFAULT);
|
||||||
}
|
}
|
||||||
|
|
||||||
public Builder(String context, Configuration conf) {
|
public Builder(String context, Configuration conf) {
|
||||||
@ -141,6 +141,14 @@ public Builder(String context, Configuration conf) {
|
|||||||
checkFieldSeparator(fieldSeparator);
|
checkFieldSeparator(fieldSeparator);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Builder(String context, String separator) {
|
||||||
|
if (isValid(context)) {
|
||||||
|
sb.append(context);
|
||||||
|
}
|
||||||
|
fieldSeparator = separator;
|
||||||
|
checkFieldSeparator(fieldSeparator);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Check whether the separator is legal.
|
* Check whether the separator is legal.
|
||||||
* The illegal separators include '\t', '\n', '='.
|
* The illegal separators include '\t', '\n', '='.
|
||||||
|
@ -18,6 +18,8 @@
|
|||||||
|
|
||||||
package org.apache.hadoop.hdfs.server.federation.router;
|
package org.apache.hadoop.hdfs.server.federation.router;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_SEPARATOR_DEFAULT;
|
||||||
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_SEPARATOR_KEY;
|
||||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY;
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY;
|
||||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_TIMEOUT_KEY;
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_TIMEOUT_KEY;
|
||||||
|
|
||||||
@ -66,8 +68,10 @@
|
|||||||
import org.apache.hadoop.io.retry.RetryPolicies;
|
import org.apache.hadoop.io.retry.RetryPolicies;
|
||||||
import org.apache.hadoop.io.retry.RetryPolicy;
|
import org.apache.hadoop.io.retry.RetryPolicy;
|
||||||
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction.RetryDecision;
|
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction.RetryDecision;
|
||||||
|
import org.apache.hadoop.ipc.CallerContext;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
import org.apache.hadoop.ipc.RetriableException;
|
import org.apache.hadoop.ipc.RetriableException;
|
||||||
|
import org.apache.hadoop.ipc.Server;
|
||||||
import org.apache.hadoop.ipc.StandbyException;
|
import org.apache.hadoop.ipc.StandbyException;
|
||||||
import org.apache.hadoop.net.ConnectTimeoutException;
|
import org.apache.hadoop.net.ConnectTimeoutException;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
@ -115,11 +119,14 @@ public class RouterRpcClient {
|
|||||||
private final RetryPolicy retryPolicy;
|
private final RetryPolicy retryPolicy;
|
||||||
/** Optional perf monitor. */
|
/** Optional perf monitor. */
|
||||||
private final RouterRpcMonitor rpcMonitor;
|
private final RouterRpcMonitor rpcMonitor;
|
||||||
|
/** Field separator of CallerContext. */
|
||||||
|
private final String contextFieldSeparator;
|
||||||
|
|
||||||
/** Pattern to parse a stack trace line. */
|
/** Pattern to parse a stack trace line. */
|
||||||
private static final Pattern STACK_TRACE_PATTERN =
|
private static final Pattern STACK_TRACE_PATTERN =
|
||||||
Pattern.compile("\\tat (.*)\\.(.*)\\((.*):(\\d*)\\)");
|
Pattern.compile("\\tat (.*)\\.(.*)\\((.*):(\\d*)\\)");
|
||||||
|
|
||||||
|
private static final String CLIENT_IP_STR = "clientIp";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a router RPC client to manage remote procedure calls to NNs.
|
* Create a router RPC client to manage remote procedure calls to NNs.
|
||||||
@ -136,6 +143,9 @@ public RouterRpcClient(Configuration conf, Router router,
|
|||||||
this.namenodeResolver = resolver;
|
this.namenodeResolver = resolver;
|
||||||
|
|
||||||
Configuration clientConf = getClientConfiguration(conf);
|
Configuration clientConf = getClientConfiguration(conf);
|
||||||
|
this.contextFieldSeparator =
|
||||||
|
clientConf.get(HADOOP_CALLER_CONTEXT_SEPARATOR_KEY,
|
||||||
|
HADOOP_CALLER_CONTEXT_SEPARATOR_DEFAULT);
|
||||||
this.connectionManager = new ConnectionManager(clientConf);
|
this.connectionManager = new ConnectionManager(clientConf);
|
||||||
this.connectionManager.start();
|
this.connectionManager.start();
|
||||||
|
|
||||||
@ -404,6 +414,7 @@ private Object invokeMethod(
|
|||||||
" with params " + Arrays.deepToString(params) + " from "
|
" with params " + Arrays.deepToString(params) + " from "
|
||||||
+ router.getRouterId());
|
+ router.getRouterId());
|
||||||
}
|
}
|
||||||
|
appendClientIpToCallerContext();
|
||||||
|
|
||||||
Object ret = null;
|
Object ret = null;
|
||||||
if (rpcMonitor != null) {
|
if (rpcMonitor != null) {
|
||||||
@ -519,6 +530,20 @@ private Object invokeMethod(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* For Tracking which is the actual client address.
|
||||||
|
* It adds key/value (clientIp/"ip") pair to the caller context.
|
||||||
|
*/
|
||||||
|
private void appendClientIpToCallerContext() {
|
||||||
|
final CallerContext ctx = CallerContext.getCurrent();
|
||||||
|
String origContext = ctx == null ? null : ctx.getContext();
|
||||||
|
byte[] origSignature = ctx == null ? null : ctx.getSignature();
|
||||||
|
CallerContext.setCurrent(
|
||||||
|
new CallerContext.Builder(origContext, contextFieldSeparator)
|
||||||
|
.append(CLIENT_IP_STR, Server.getRemoteAddress())
|
||||||
|
.setSignature(origSignature).build());
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Invokes a method on the designated object. Catches exceptions specific to
|
* Invokes a method on the designated object. Catches exceptions specific to
|
||||||
* the invocation.
|
* the invocation.
|
||||||
|
@ -118,6 +118,7 @@
|
|||||||
import org.apache.hadoop.io.EnumSetWritable;
|
import org.apache.hadoop.io.EnumSetWritable;
|
||||||
import org.apache.hadoop.io.erasurecode.ECSchema;
|
import org.apache.hadoop.io.erasurecode.ECSchema;
|
||||||
import org.apache.hadoop.io.erasurecode.ErasureCodeConstants;
|
import org.apache.hadoop.io.erasurecode.ErasureCodeConstants;
|
||||||
|
import org.apache.hadoop.ipc.CallerContext;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.service.Service.STATE;
|
import org.apache.hadoop.service.Service.STATE;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
@ -196,6 +197,8 @@ public int compare(
|
|||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void globalSetUp() throws Exception {
|
public static void globalSetUp() throws Exception {
|
||||||
Configuration namenodeConf = new Configuration();
|
Configuration namenodeConf = new Configuration();
|
||||||
|
namenodeConf.setBoolean(DFSConfigKeys.HADOOP_CALLER_CONTEXT_ENABLED_KEY,
|
||||||
|
true);
|
||||||
// It's very easy to become overloaded for some specific dn in this small
|
// It's very easy to become overloaded for some specific dn in this small
|
||||||
// cluster, which will cause the EC file block allocation failure. To avoid
|
// cluster, which will cause the EC file block allocation failure. To avoid
|
||||||
// this issue, we disable considerLoad option.
|
// this issue, we disable considerLoad option.
|
||||||
@ -1841,4 +1844,27 @@ private DFSClient getFileDFSClient(final String path) {
|
|||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMkdirsWithCallerContext() throws IOException {
|
||||||
|
GenericTestUtils.LogCapturer auditlog =
|
||||||
|
GenericTestUtils.LogCapturer.captureLogs(FSNamesystem.auditLog);
|
||||||
|
|
||||||
|
// Current callerContext is null
|
||||||
|
assertNull(CallerContext.getCurrent());
|
||||||
|
|
||||||
|
// Set client context
|
||||||
|
CallerContext.setCurrent(
|
||||||
|
new CallerContext.Builder("clientContext").build());
|
||||||
|
|
||||||
|
// Create a directory via the router
|
||||||
|
String dirPath = "/test_dir_with_callercontext";
|
||||||
|
FsPermission permission = new FsPermission("755");
|
||||||
|
routerProtocol.mkdirs(dirPath, permission, false);
|
||||||
|
|
||||||
|
// The audit log should contains "callerContext=clientContext,clientIp:"
|
||||||
|
assertTrue(auditlog.getOutput()
|
||||||
|
.contains("callerContext=clientContext,clientIp:"));
|
||||||
|
assertTrue(verifyFileExists(routerFS, dirPath));
|
||||||
|
}
|
||||||
}
|
}
|
Loading…
Reference in New Issue
Block a user