HDFS-16310. RBF: Add client port to CallerContext for Router (#3635)
This commit is contained in:
parent
91af256a5b
commit
5b05068fdc
@ -132,6 +132,7 @@ public class RouterRpcClient {
|
|||||||
Pattern.compile("\\tat (.*)\\.(.*)\\((.*):(\\d*)\\)");
|
Pattern.compile("\\tat (.*)\\.(.*)\\((.*):(\\d*)\\)");
|
||||||
|
|
||||||
private static final String CLIENT_IP_STR = "clientIp";
|
private static final String CLIENT_IP_STR = "clientIp";
|
||||||
|
private static final String CLIENT_PORT_STR = "clientPort";
|
||||||
|
|
||||||
/** Fairness manager to control handlers assigned per NS. */
|
/** Fairness manager to control handlers assigned per NS. */
|
||||||
private RouterRpcFairnessPolicyController routerRpcFairnessPolicyController;
|
private RouterRpcFairnessPolicyController routerRpcFairnessPolicyController;
|
||||||
@ -465,7 +466,7 @@ private Object invokeMethod(
|
|||||||
+ router.getRouterId());
|
+ router.getRouterId());
|
||||||
}
|
}
|
||||||
|
|
||||||
appendClientIpToCallerContextIfAbsent();
|
appendClientIpPortToCallerContextIfAbsent();
|
||||||
|
|
||||||
Object ret = null;
|
Object ret = null;
|
||||||
if (rpcMonitor != null) {
|
if (rpcMonitor != null) {
|
||||||
@ -586,25 +587,20 @@ private Object invokeMethod(
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* For tracking which is the actual client address.
|
* For tracking which is the actual client address.
|
||||||
* It adds trace info "clientIp:ip" to caller context if it's absent.
|
* It adds trace info "clientIp:ip" and "clientPort:port"
|
||||||
|
* to caller context if they are absent.
|
||||||
*/
|
*/
|
||||||
private void appendClientIpToCallerContextIfAbsent() {
|
private void appendClientIpPortToCallerContextIfAbsent() {
|
||||||
String clientIpInfo = CLIENT_IP_STR + ":" + Server.getRemoteAddress();
|
CallerContext ctx = CallerContext.getCurrent();
|
||||||
final CallerContext ctx = CallerContext.getCurrent();
|
String origContext = ctx == null ? null : ctx.getContext();
|
||||||
if (isClientIpInfoAbsent(clientIpInfo, ctx)) {
|
byte[] origSignature = ctx == null ? null : ctx.getSignature();
|
||||||
String origContext = ctx == null ? null : ctx.getContext();
|
CallerContext.setCurrent(
|
||||||
byte[] origSignature = ctx == null ? null : ctx.getSignature();
|
new CallerContext.Builder(origContext, contextFieldSeparator)
|
||||||
CallerContext.setCurrent(
|
.appendIfAbsent(CLIENT_IP_STR, Server.getRemoteAddress())
|
||||||
new CallerContext.Builder(origContext, contextFieldSeparator)
|
.appendIfAbsent(CLIENT_PORT_STR,
|
||||||
.append(clientIpInfo)
|
Integer.toString(Server.getRemotePort()))
|
||||||
.setSignature(origSignature)
|
.setSignature(origSignature)
|
||||||
.build());
|
.build());
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private boolean isClientIpInfoAbsent(String clientIpInfo, CallerContext ctx){
|
|
||||||
return ctx == null || ctx.getContext() == null
|
|
||||||
|| !ctx.getContext().contains(clientIpInfo);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1967,4 +1967,39 @@ public void testSetBalancerBandwidth() throws Exception {
|
|||||||
return datanodes.get(0).getBalancerBandwidth() == newBandwidth;
|
return datanodes.get(0).getBalancerBandwidth() == newBandwidth;
|
||||||
}, 100, 60 * 1000);
|
}, 100, 60 * 1000);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAddClientIpPortToCallerContext() throws IOException {
|
||||||
|
GenericTestUtils.LogCapturer auditLog =
|
||||||
|
GenericTestUtils.LogCapturer.captureLogs(FSNamesystem.auditLog);
|
||||||
|
|
||||||
|
// 1. ClientIp and ClientPort are not set on the client.
|
||||||
|
// Set client context.
|
||||||
|
CallerContext.setCurrent(
|
||||||
|
new CallerContext.Builder("clientContext").build());
|
||||||
|
|
||||||
|
// Create a directory via the router.
|
||||||
|
String dirPath = "/test";
|
||||||
|
routerProtocol.mkdirs(dirPath, new FsPermission("755"), false);
|
||||||
|
|
||||||
|
// The audit log should contains "clientIp:" and "clientPort:".
|
||||||
|
assertTrue(auditLog.getOutput().contains("clientIp:"));
|
||||||
|
assertTrue(auditLog.getOutput().contains("clientPort:"));
|
||||||
|
assertTrue(verifyFileExists(routerFS, dirPath));
|
||||||
|
auditLog.clearOutput();
|
||||||
|
|
||||||
|
// 2. ClientIp and ClientPort are set on the client.
|
||||||
|
// Reset client context.
|
||||||
|
CallerContext.setCurrent(
|
||||||
|
new CallerContext.Builder(
|
||||||
|
"clientContext,clientIp:1.1.1.1,clientPort:1234").build());
|
||||||
|
|
||||||
|
// Create a directory via the router.
|
||||||
|
routerProtocol.getFileInfo(dirPath);
|
||||||
|
|
||||||
|
// The audit log should contains the original clientIp and clientPort
|
||||||
|
// set by client.
|
||||||
|
assertTrue(auditLog.getOutput().contains("clientIp:1.1.1.1"));
|
||||||
|
assertTrue(auditLog.getOutput().contains("clientPort:1234"));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -462,8 +462,7 @@ private void logAuditEvent(boolean succeeded,
|
|||||||
|
|
||||||
private void appendClientPortToCallerContextIfAbsent() {
|
private void appendClientPortToCallerContextIfAbsent() {
|
||||||
final CallerContext ctx = CallerContext.getCurrent();
|
final CallerContext ctx = CallerContext.getCurrent();
|
||||||
if (isClientPortInfoAbsent(CLIENT_PORT_STR + ":" + Server.getRemotePort(),
|
if (isClientPortInfoAbsent(ctx)) {
|
||||||
ctx)) {
|
|
||||||
String origContext = ctx == null ? null : ctx.getContext();
|
String origContext = ctx == null ? null : ctx.getContext();
|
||||||
byte[] origSignature = ctx == null ? null : ctx.getSignature();
|
byte[] origSignature = ctx == null ? null : ctx.getSignature();
|
||||||
CallerContext.setCurrent(
|
CallerContext.setCurrent(
|
||||||
@ -474,9 +473,9 @@ private void appendClientPortToCallerContextIfAbsent() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean isClientPortInfoAbsent(String clientPortInfo, CallerContext ctx){
|
private boolean isClientPortInfoAbsent(CallerContext ctx){
|
||||||
return ctx == null || ctx.getContext() == null
|
return ctx == null || ctx.getContext() == null
|
||||||
|| !ctx.getContext().contains(clientPortInfo);
|
|| !ctx.getContext().contains(CLIENT_PORT_STR);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
Loading…
Reference in New Issue
Block a user