HDFS-15630. RBF: Fix wrong client IP info in CallerContext when requests mount points with multi-destinations. Contributed by Chengwei Wang

This commit is contained in:
Hui Fei 2020-10-23 13:41:43 +08:00
parent d259928035
commit 264c948e60
2 changed files with 91 additions and 12 deletions

View File

@ -72,6 +72,7 @@
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.RetriableException; import org.apache.hadoop.ipc.RetriableException;
import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.Server.Call;
import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.net.ConnectTimeoutException; import org.apache.hadoop.net.ConnectTimeoutException;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
@ -414,7 +415,8 @@ private Object invokeMethod(
" with params " + Arrays.deepToString(params) + " from " " with params " + Arrays.deepToString(params) + " from "
+ router.getRouterId()); + router.getRouterId());
} }
appendClientIpToCallerContext();
appendClientIpToCallerContextIfAbsent();
Object ret = null; Object ret = null;
if (rpcMonitor != null) { if (rpcMonitor != null) {
@ -531,17 +533,26 @@ private Object invokeMethod(
} }
/** /**
* For Tracking which is the actual client address. * For tracking which is the actual client address.
* It adds key/value (clientIp/"ip") pair to the caller context. * It adds trace info "clientIp:ip" to caller context if it's absent.
*/ */
private void appendClientIpToCallerContext() { private void appendClientIpToCallerContextIfAbsent() {
String clientIpInfo = CLIENT_IP_STR + ":" + Server.getRemoteAddress();
final CallerContext ctx = CallerContext.getCurrent(); final CallerContext ctx = CallerContext.getCurrent();
String origContext = ctx == null ? null : ctx.getContext(); if (isClientIpInfoAbsent(clientIpInfo, ctx)) {
byte[] origSignature = ctx == null ? null : ctx.getSignature(); String origContext = ctx == null ? null : ctx.getContext();
CallerContext.setCurrent( byte[] origSignature = ctx == null ? null : ctx.getSignature();
new CallerContext.Builder(origContext, contextFieldSeparator) CallerContext.setCurrent(
.append(CLIENT_IP_STR, Server.getRemoteAddress()) new CallerContext.Builder(origContext, contextFieldSeparator)
.setSignature(origSignature).build()); .append(clientIpInfo)
.setSignature(origSignature)
.build());
}
}
private boolean isClientIpInfoAbsent(String clientIpInfo, CallerContext ctx){
return ctx == null || ctx.getContext() == null
|| !ctx.getContext().contains(clientIpInfo);
} }
/** /**
@ -1303,6 +1314,9 @@ public <T extends RemoteLocationContext, R> Map<T, R> invokeConcurrent(
List<T> orderedLocations = new ArrayList<>(); List<T> orderedLocations = new ArrayList<>();
List<Callable<Object>> callables = new ArrayList<>(); List<Callable<Object>> callables = new ArrayList<>();
// transfer originCall & callerContext to worker threads of executor.
final Call originCall = Server.getCurCall().get();
final CallerContext originContext = CallerContext.getCurrent();
for (final T location : locations) { for (final T location : locations) {
String nsId = location.getNameserviceId(); String nsId = location.getNameserviceId();
final List<? extends FederationNamenodeContext> namenodes = final List<? extends FederationNamenodeContext> namenodes =
@ -1320,12 +1334,20 @@ public <T extends RemoteLocationContext, R> Map<T, R> invokeConcurrent(
nnLocation = (T)new RemoteLocation(nsId, nnId, location.getDest()); nnLocation = (T)new RemoteLocation(nsId, nnId, location.getDest());
} }
orderedLocations.add(nnLocation); orderedLocations.add(nnLocation);
callables.add(() -> invokeMethod(ugi, nnList, proto, m, paramList)); callables.add(
() -> {
transferThreadLocalContext(originCall, originContext);
return invokeMethod(ugi, nnList, proto, m, paramList);
});
} }
} else { } else {
// Call the objectGetter in order of nameservices in the NS list // Call the objectGetter in order of nameservices in the NS list
orderedLocations.add(location); orderedLocations.add(location);
callables.add(() -> invokeMethod(ugi, namenodes, proto, m, paramList)); callables.add(
() -> {
transferThreadLocalContext(originCall, originContext);
return invokeMethod(ugi, namenodes, proto, m, paramList);
});
} }
} }
@ -1392,6 +1414,20 @@ public <T extends RemoteLocationContext, R> Map<T, R> invokeConcurrent(
} }
} }
/**
* Transfer origin thread local context which is necessary to current
* worker thread when invoking method concurrently by executor service.
*
* @param originCall origin Call required for getting remote client ip.
* @param originContext origin CallerContext which should be transferred
* to server side.
*/
private void transferThreadLocalContext(
final Call originCall, final CallerContext originContext) {
Server.getCurCall().set(originCall);
CallerContext.setCurrent(originContext);
}
/** /**
* Get a prioritized list of NNs that share the same nameservice ID (in the * Get a prioritized list of NNs that share the same nameservice ID (in the
* same namespace). NNs that are reported as ACTIVE will be first in the list. * same namespace). NNs that are reported as ACTIVE will be first in the list.

View File

@ -32,6 +32,7 @@
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.net.InetAddress;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.Arrays; import java.util.Arrays;
import java.util.EnumSet; import java.util.EnumSet;
@ -67,6 +68,7 @@
import org.apache.hadoop.hdfs.server.namenode.ha.HAContext; import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
import org.apache.hadoop.io.EnumSetWritable; import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.CallerContext;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
@ -434,4 +436,45 @@ public void testSubclusterDown() throws Exception {
setInternalState(ns0, "haContext", nn0haCtx); setInternalState(ns0, "haContext", nn0haCtx);
setInternalState(router0ClientProtocol, "allowPartialList", true); setInternalState(router0ClientProtocol, "allowPartialList", true);
} }
@Test
public void testCallerContextWithMultiDestinations() throws IOException {
GenericTestUtils.LogCapturer auditLog =
GenericTestUtils.LogCapturer.captureLogs(FSNamesystem.auditLog);
// set client context
CallerContext.setCurrent(
new CallerContext.Builder("clientContext").build());
// assert the initial caller context as expected
assertEquals("clientContext", CallerContext.getCurrent().getContext());
DistributedFileSystem routerFs =
(DistributedFileSystem) getRouterFileSystem();
// create a directory via the router
Path dirPath = new Path("/test_caller_context_with_multi_destinations");
routerFs.mkdirs(dirPath);
// invoke concurrently in RouterRpcClient
routerFs.listStatus(dirPath);
// invoke sequentially in RouterRpcClient
routerFs.getFileStatus(dirPath);
String auditFlag = "src=" + dirPath.toString();
String clientIpInfo = "clientIp:"
+ InetAddress.getLocalHost().getHostAddress();
for (String line : auditLog.getOutput().split("\n")) {
if (line.contains(auditFlag)) {
// assert origin caller context exist in audit log
assertTrue(line.contains("callerContext=clientContext"));
String callerContext = line.substring(
line.indexOf("callerContext=clientContext"));
// assert client ip info exist in caller context
assertTrue(callerContext.contains(clientIpInfo));
// assert client ip info appears only once in caller context
assertEquals(callerContext.indexOf(clientIpInfo),
callerContext.lastIndexOf(clientIpInfo));
}
}
// clear client context
CallerContext.setCurrent(null);
}
} }