HDFS-15079. RBF: Namenode needs to use the actual client Id and callId when going through RBF proxy. (#4530)

This commit is contained in:
xuzq 2022-07-23 22:19:37 +08:00 committed by GitHub
parent 5c84cb81ba
commit 2c96357051
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 395 additions and 108 deletions

View File

@ -46,6 +46,10 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
public static final Logger LOG = LoggerFactory.getLogger(
RetryInvocationHandler.class);
@VisibleForTesting
public static final ThreadLocal<Boolean> SET_CALL_ID_FOR_TEST =
ThreadLocal.withInitial(() -> true);
static class Call {
private final Method method;
private final Object[] args;
@ -159,7 +163,7 @@ CallReturn invoke() throws Throwable {
}
Object invokeMethod() throws Throwable {
if (isRpc) {
if (isRpc && SET_CALL_ID_FOR_TEST.get()) {
Client.setCallIdAndRetryCount(callId, counters.retries,
retryInvocationHandler.asyncCallHandler);
}

View File

@ -47,6 +47,8 @@ public final class CallerContext {
// field names
public static final String CLIENT_IP_STR = "clientIp";
public static final String CLIENT_PORT_STR = "clientPort";
public static final String CLIENT_ID_STR = "clientId";
public static final String CLIENT_CALL_ID_STR = "clientCallId";
/** The caller context.
*

View File

@ -55,14 +55,14 @@ public static class CacheEntry implements LightWeightCache.Entry {
/**
* Processing state of the requests.
*/
private static byte INPROGRESS = 0;
private static byte SUCCESS = 1;
private static byte FAILED = 2;
private static final byte INPROGRESS = 0;
private static final byte SUCCESS = 1;
private static final byte FAILED = 2;
private byte state = INPROGRESS;
// Store uuid as two long for better memory utilization
private final long clientIdMsb; // Most signficant bytes
private final long clientIdMsb; // Most significant bytes
private final long clientIdLsb; // Least significant bytes
private final int callId;
@ -140,8 +140,8 @@ public long getExpirationTime() {
@Override
public String toString() {
return (new UUID(this.clientIdMsb, this.clientIdLsb)).toString() + ":"
+ this.callId + ":" + this.state;
return String.format("%s:%s:%s", new UUID(this.clientIdMsb, this.clientIdLsb),
this.callId, this.state);
}
}
@ -183,7 +183,7 @@ public Object getPayload() {
private final LightWeightGSet<CacheEntry, CacheEntry> set;
private final long expirationTime;
private String cacheName;
private final String cacheName;
private final ReentrantLock lock = new ReentrantLock();
@ -195,7 +195,7 @@ public Object getPayload() {
*/
public RetryCache(String cacheName, double percentage, long expirationTime) {
int capacity = LightWeightGSet.computeCapacity(percentage, cacheName);
capacity = capacity > MAX_CAPACITY ? capacity : MAX_CAPACITY;
capacity = Math.max(capacity, MAX_CAPACITY);
this.set = new LightWeightCache<CacheEntry, CacheEntry>(capacity, capacity,
expirationTime, 0);
this.expirationTime = expirationTime;
@ -203,11 +203,11 @@ public RetryCache(String cacheName, double percentage, long expirationTime) {
this.retryCacheMetrics = RetryCacheMetrics.create(this);
}
private static boolean skipRetryCache() {
private static boolean skipRetryCache(byte[] clientId, int callId) {
// Do not track non RPC invocation or RPC requests with
// invalid callId or clientId in retry cache
return !Server.isRpcInvocation() || Server.getCallId() < 0
|| Arrays.equals(Server.getClientId(), RpcConstants.DUMMY_CLIENT_ID);
return !Server.isRpcInvocation() || callId < 0
|| Arrays.equals(clientId, RpcConstants.DUMMY_CLIENT_ID);
}
public void lock() {
@ -332,43 +332,51 @@ public void addCacheEntryWithPayload(byte[] clientId, int callId,
retryCacheMetrics.incrCacheUpdated();
}
private static CacheEntry newEntry(long expirationTime) {
return new CacheEntry(Server.getClientId(), Server.getCallId(),
private static CacheEntry newEntry(long expirationTime,
byte[] clientId, int callId) {
return new CacheEntry(clientId, callId,
System.nanoTime() + expirationTime);
}
private static CacheEntryWithPayload newEntry(Object payload,
long expirationTime) {
return new CacheEntryWithPayload(Server.getClientId(), Server.getCallId(),
long expirationTime, byte[] clientId, int callId) {
return new CacheEntryWithPayload(clientId, callId,
payload, System.nanoTime() + expirationTime);
}
/**
* Static method that provides null check for retryCache.
* @param cache input Cache.
* @param clientId client id of this request
* @param callId client call id of this request
* @return CacheEntry.
*/
public static CacheEntry waitForCompletion(RetryCache cache) {
if (skipRetryCache()) {
public static CacheEntry waitForCompletion(RetryCache cache,
byte[] clientId, int callId) {
if (skipRetryCache(clientId, callId)) {
return null;
}
return cache != null ? cache
.waitForCompletion(newEntry(cache.expirationTime)) : null;
.waitForCompletion(newEntry(cache.expirationTime,
clientId, callId)) : null;
}
/**
* Static method that provides null check for retryCache.
* @param cache input cache.
* @param payload input payload.
* @param clientId client id of this request
* @param callId client call id of this request
* @return CacheEntryWithPayload.
*/
public static CacheEntryWithPayload waitForCompletion(RetryCache cache,
Object payload) {
if (skipRetryCache()) {
Object payload, byte[] clientId, int callId) {
if (skipRetryCache(clientId, callId)) {
return null;
}
return (CacheEntryWithPayload) (cache != null ? cache
.waitForCompletion(newEntry(payload, cache.expirationTime)) : null);
.waitForCompletion(newEntry(payload, cache.expirationTime,
clientId, callId)) : null);
}
public static void setState(CacheEntry e, boolean success) {

View File

@ -50,14 +50,14 @@ public void setup() {
static class TestServer {
AtomicInteger retryCount = new AtomicInteger();
AtomicInteger operationCount = new AtomicInteger();
private RetryCache retryCache = new RetryCache("TestRetryCache", 1,
100 * 1000 * 1000 * 1000L);
private final RetryCache retryCache = new RetryCache(
"TestRetryCache", 1, 100 * 1000 * 1000 * 1000L);
/**
* A server method implemented using {@link RetryCache}.
*
* @param input is returned back in echo, if {@code success} is true.
* @param failureOuput returned on failure, if {@code success} is false.
* @param failureOutput returned on failure, if {@code success} is false.
* @param methodTime time taken by the operation. By passing smaller/larger
* value one can simulate an operation that takes short/long time.
* @param success whether this operation completes successfully or not
@ -67,7 +67,7 @@ static class TestServer {
int echo(int input, int failureOutput, long methodTime, boolean success)
throws InterruptedException {
CacheEntryWithPayload entry = RetryCache.waitForCompletion(retryCache,
null);
null, Server.getClientId(), Server.getCallId());
if (entry != null && entry.isSuccess()) {
System.out.println("retryCount incremented " + retryCount.get());
retryCount.incrementAndGet();
@ -173,16 +173,13 @@ public void testOperations(final int input, final int numberOfThreads,
final int failureOutput = input + 1;
ExecutorService executorService = Executors
.newFixedThreadPool(numberOfThreads);
List<Future<Integer>> list = new ArrayList<Future<Integer>>();
List<Future<Integer>> list = new ArrayList<>();
for (int i = 0; i < numberOfThreads; i++) {
Callable<Integer> worker = new Callable<Integer>() {
@Override
public Integer call() throws Exception {
Server.getCurCall().set(call);
Assert.assertEquals(Server.getCurCall().get(), call);
int randomPause = pause == 0 ? pause : r.nextInt(pause);
return testServer.echo(input, failureOutput, randomPause, success);
}
Callable<Integer> worker = () -> {
Server.getCurCall().set(call);
Assert.assertEquals(Server.getCurCall().get(), call);
int randomPause = pause == 0 ? pause : r.nextInt(pause);
return testServer.echo(input, failureOutput, randomPause, success);
};
Future<Integer> submit = executorService.submit(worker);
list.add(submit);

View File

@ -80,6 +80,7 @@
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.net.ConnectTimeoutException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
import org.eclipse.jetty.util.ajax.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -464,7 +465,7 @@ public Object invokeMethod(
+ router.getRouterId());
}
addClientIpToCallerContext();
addClientInfoToCallerContext();
Object ret = null;
if (rpcMonitor != null) {
@ -584,12 +585,13 @@ public Object invokeMethod(
}
/**
* For tracking which is the actual client address.
* It adds trace info "clientIp:ip" and "clientPort:port"
* For tracking some information about the actual client.
* It adds trace info "clientIp:ip", "clientPort:port",
* "clientId:id" and "clientCallId:callId"
* in the caller context, removing the old values if they were
* already present.
*/
private void addClientIpToCallerContext() {
private void addClientInfoToCallerContext() {
CallerContext ctx = CallerContext.getCurrent();
String origContext = ctx == null ? null : ctx.getContext();
byte[] origSignature = ctx == null ? null : ctx.getSignature();
@ -598,6 +600,10 @@ private void addClientIpToCallerContext() {
.append(CallerContext.CLIENT_IP_STR, Server.getRemoteAddress())
.append(CallerContext.CLIENT_PORT_STR,
Integer.toString(Server.getRemotePort()))
.append(CallerContext.CLIENT_ID_STR,
StringUtils.byteToHexString(Server.getClientId()))
.append(CallerContext.CLIENT_CALL_ID_STR,
Integer.toString(Server.getCallId()))
.setSignature(origSignature);
// Append the original caller context
if (origContext != null) {

View File

@ -0,0 +1,144 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.io.retry.RetryInvocationHandler;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.security.UserGroupInformation;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_IP_PROXY_USERS;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
public class TestRouterRetryCache {
/** Federated HDFS cluster. */
private MiniRouterDFSCluster cluster;
@Before
public void setup() throws Exception {
Configuration namenodeConf = new Configuration();
namenodeConf.set(DFS_NAMENODE_IP_PROXY_USERS, "fake_joe");
cluster = new MiniRouterDFSCluster(true, 1);
cluster.addNamenodeOverrides(namenodeConf);
// Start NNs and DNs and wait until ready
cluster.startCluster();
// Start routers with only an RPC service
cluster.startRouters();
// Register and verify all NNs with all routers
cluster.registerNamenodes();
cluster.waitNamenodeRegistration();
// Setup the mount table
cluster.installMockLocations();
// Making one Namenodes active per nameservice
if (cluster.isHighAvailability()) {
for (String ns : cluster.getNameservices()) {
cluster.switchToActive(ns, NAMENODES[0]);
cluster.switchToStandby(ns, NAMENODES[1]);
}
}
cluster.waitActiveNamespaces();
}
@After
public void teardown() throws IOException {
if (cluster != null) {
cluster.shutdown();
cluster = null;
}
}
@Test
public void testRetryCache() throws Exception {
RetryInvocationHandler.SET_CALL_ID_FOR_TEST.set(false);
FileSystem routerFS = cluster.getRandomRouter().getFileSystem();
Path testDir = new Path("/target-ns0/testdir");
routerFS.mkdirs(testDir);
routerFS.setPermission(testDir, FsPermission.getDefault());
// Run as fake joe to authorize the test
UserGroupInformation joe =
UserGroupInformation.createUserForTesting("fake_joe",
new String[]{"fake_group"});
FileSystem joeFS = joe.doAs(
(PrivilegedExceptionAction<FileSystem>) () ->
FileSystem.newInstance(routerFS.getUri(), routerFS.getConf()));
Path renameSrc = new Path(testDir, "renameSrc");
Path renameDst = new Path(testDir, "renameDst");
joeFS.mkdirs(renameSrc);
assertEquals(HAServiceProtocol.HAServiceState.ACTIVE,
cluster.getCluster().getNamesystem(0).getState());
int callId = Client.nextCallId();
Client.setCallIdAndRetryCount(callId, 0, null);
assertTrue(joeFS.rename(renameSrc, renameDst));
Client.setCallIdAndRetryCount(callId, 0, null);
assertTrue(joeFS.rename(renameSrc, renameDst));
String ns0 = cluster.getNameservices().get(0);
cluster.switchToStandby(ns0, NAMENODES[0]);
cluster.switchToActive(ns0, NAMENODES[1]);
assertEquals(HAServiceProtocol.HAServiceState.ACTIVE,
cluster.getCluster().getNamesystem(1).getState());
Client.setCallIdAndRetryCount(callId, 0, null);
assertTrue(joeFS.rename(renameSrc, renameDst));
}
@Test
public void testParseSpecialValue() {
String mockContent = "mockContent,clientIp:127.0.0.1," +
"clientCallId:12345,clientId:mockClientId";
String clientIp = NameNode.parseSpecialValue(mockContent, "clientIp:");
assertEquals("127.0.0.1", clientIp);
String clientCallId = NameNode.parseSpecialValue(
mockContent, "clientCallId:");
assertEquals("12345", clientCallId);
String clientId = NameNode.parseSpecialValue(mockContent, "clientId:");
assertEquals("mockClientId", clientId);
String clientRetryNum = NameNode.parseSpecialValue(
mockContent, "clientRetryNum:");
assertNull(clientRetryNum);
}
}

View File

@ -2053,6 +2053,8 @@ public void testMkdirsWithCallerContext() throws IOException {
final String logOutput = auditlog.getOutput();
assertTrue(logOutput.contains("callerContext=clientIp:"));
assertTrue(logOutput.contains(",clientContext"));
assertTrue(logOutput.contains(",clientId"));
assertTrue(logOutput.contains(",clientCallId"));
assertTrue(verifyFileExists(routerFS, dirPath));
}
@ -2103,6 +2105,41 @@ public void testAddClientIpPortToCallerContext() throws IOException {
assertFalse(auditLog.getOutput().contains("clientPort:1234"));
}
@Test
public void testAddClientIdAndCallIdToCallerContext() throws IOException {
GenericTestUtils.LogCapturer auditLog =
GenericTestUtils.LogCapturer.captureLogs(FSNamesystem.auditLog);
// 1. ClientId and ClientCallId are not set on the client.
// Set client context.
CallerContext.setCurrent(
new CallerContext.Builder("clientContext").build());
// Create a directory via the router.
String dirPath = "/test";
routerProtocol.mkdirs(dirPath, new FsPermission("755"), false);
// The audit log should contains "clientId:" and "clientCallId:".
assertTrue(auditLog.getOutput().contains("clientId:"));
assertTrue(auditLog.getOutput().contains("clientCallId:"));
assertTrue(verifyFileExists(routerFS, dirPath));
auditLog.clearOutput();
// 2. ClientId and ClientCallId are set on the client.
// Reset client context.
CallerContext.setCurrent(
new CallerContext.Builder(
"clientContext,clientId:mockClientId,clientCallId:4321").build());
// Create a directory via the router.
routerProtocol.getFileInfo(dirPath);
// The audit log should not contain the original clientId and clientCallId
// set by client.
assertFalse(auditLog.getOutput().contains("clientId:mockClientId"));
assertFalse(auditLog.getOutput().contains("clientCallId:4321"));
}
@Test
public void testContentSummaryWithSnapshot() throws Exception {
DistributedFileSystem routerDFS = (DistributedFileSystem) routerFS;

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_IP_PROXY_USERS;
import static org.apache.hadoop.util.ExitUtil.terminate;
import static org.apache.hadoop.util.Time.monotonicNow;
@ -30,6 +31,7 @@
import java.util.List;
import java.util.concurrent.atomic.LongAdder;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
@ -107,7 +109,6 @@
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.util.Lists;
@ -195,6 +196,9 @@ private enum State {
protected final OpInstanceCache cache = new OpInstanceCache();
// Users who can override the client ip
private final String[] ipProxyUsers;
/**
* The edit directories that are shared between primary and secondary.
*/
@ -246,6 +250,7 @@ static FSEditLog newInstance(Configuration conf, NNStorage storage,
* @param editsDirs List of journals to use
*/
FSEditLog(Configuration conf, NNStorage storage, List<URI> editsDirs) {
ipProxyUsers = conf.getStrings(DFS_NAMENODE_IP_PROXY_USERS);
isSyncRunning = false;
this.conf = conf;
this.storage = storage;
@ -799,8 +804,10 @@ private void printStatistics(boolean force) {
/** Record the RPC IDs if necessary */
private void logRpcIds(FSEditLogOp op, boolean toLogRpcIds) {
if (toLogRpcIds) {
op.setRpcClientId(Server.getClientId());
op.setRpcCallId(Server.getCallId());
Pair<byte[], Integer> clientIdAndCallId =
NameNode.getClientIdAndCallId(this.ipProxyUsers);
op.setRpcClientId(clientIdAndCallId.getLeft());
op.setRpcCallId(clientIdAndCallId.getRight());
}
}

View File

@ -17,7 +17,10 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.ipc.CallerContext;
import org.apache.hadoop.thirdparty.com.google.common.base.Joiner;
import org.apache.hadoop.util.Preconditions;
@ -494,6 +497,94 @@ public static NameNodeMetrics getNameNodeMetrics() {
return metrics;
}
/**
* Try to obtain the actual client info according to the current user.
* @param ipProxyUsers Users who can override client infos
*/
private static String clientInfoFromContext(
final String[] ipProxyUsers) {
if (ipProxyUsers != null) {
UserGroupInformation user =
UserGroupInformation.getRealUserOrSelf(Server.getRemoteUser());
if (user != null &&
ArrayUtils.contains(ipProxyUsers, user.getShortUserName())) {
CallerContext context = CallerContext.getCurrent();
if (context != null && context.isContextValid()) {
return context.getContext();
}
}
}
return null;
}
/**
* Try to obtain the value corresponding to the key by parsing the content.
* @param content the full content to be parsed.
* @param key trying to obtain the value of the key.
* @return the value corresponding to the key.
*/
@VisibleForTesting
public static String parseSpecialValue(String content, String key) {
int posn = content.indexOf(key);
if (posn != -1) {
posn += key.length();
int end = content.indexOf(",", posn);
return end == -1 ? content.substring(posn)
: content.substring(posn, end);
}
return null;
}
/**
* Try to obtain the actual client's machine according to the current user.
* @param ipProxyUsers Users who can override client infos.
* @return The actual client's machine.
*/
public static String getClientMachine(final String[] ipProxyUsers) {
String cc = clientInfoFromContext(ipProxyUsers);
if (cc != null) {
// if the rpc has a caller context of "clientIp:1.2.3.4,CLI",
// return "1.2.3.4" as the client machine.
String key = CallerContext.CLIENT_IP_STR +
CallerContext.Builder.KEY_VALUE_SEPARATOR;
return parseSpecialValue(cc, key);
}
String clientMachine = Server.getRemoteAddress();
if (clientMachine == null) { //not a RPC client
clientMachine = "";
}
return clientMachine;
}
/**
* Try to obtain the actual client's id and call id
* according to the current user.
* @param ipProxyUsers Users who can override client infos
* @return The actual client's id and call id.
*/
public static Pair<byte[], Integer> getClientIdAndCallId(
final String[] ipProxyUsers) {
byte[] clientId = Server.getClientId();
int callId = Server.getCallId();
String cc = clientInfoFromContext(ipProxyUsers);
if (cc != null) {
String clientIdKey = CallerContext.CLIENT_ID_STR +
CallerContext.Builder.KEY_VALUE_SEPARATOR;
String clientIdStr = parseSpecialValue(cc, clientIdKey);
if (clientIdStr != null) {
clientId = StringUtils.hexStringToByte(clientIdStr);
}
String callIdKey = CallerContext.CLIENT_CALL_ID_STR +
CallerContext.Builder.KEY_VALUE_SEPARATOR;
String callIdStr = parseSpecialValue(cc, callIdKey);
if (callIdStr != null) {
callId = Integer.parseInt(callIdStr);
}
}
return Pair.of(clientId, callId);
}
/**
* Returns object used for reporting namenode startup progress.
*

View File

@ -46,8 +46,7 @@
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.hadoop.ipc.CallerContext;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
@ -271,7 +270,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
private final String defaultECPolicyName;
// Users who can override the client ip
// Users who can override the client info
private final String[] ipProxyUsers;
public NameNodeRpcServer(Configuration conf, NameNode nn)
@ -711,8 +710,7 @@ public NamenodeCommand startCheckpoint(NamenodeRegistration registration)
if(!nn.isRole(NamenodeRole.NAMENODE))
throw new IOException("Only an ACTIVE node can invoke startCheckpoint.");
CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache,
null);
CacheEntryWithPayload cacheEntry = getCacheEntryWithPayload(null);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return (NamenodeCommand) cacheEntry.getPayload();
}
@ -725,13 +723,33 @@ public NamenodeCommand startCheckpoint(NamenodeRegistration registration)
return ret;
}
/**
* Return the current CacheEntry.
*/
private CacheEntry getCacheEntry() {
Pair<byte[], Integer> clientInfo =
NameNode.getClientIdAndCallId(this.ipProxyUsers);
return RetryCache.waitForCompletion(
retryCache, clientInfo.getLeft(), clientInfo.getRight());
}
/**
* Return the current CacheEntryWithPayload.
*/
private CacheEntryWithPayload getCacheEntryWithPayload(Object payload) {
Pair<byte[], Integer> clientInfo =
NameNode.getClientIdAndCallId(this.ipProxyUsers);
return RetryCache.waitForCompletion(retryCache, payload,
clientInfo.getLeft(), clientInfo.getRight());
}
@Override // NamenodeProtocol
public void endCheckpoint(NamenodeRegistration registration,
CheckpointSignature sig) throws IOException {
String operationName = "endCheckpoint";
checkNNStartup();
namesystem.checkSuperuserPrivilege(operationName);
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
CacheEntry cacheEntry = getCacheEntry();
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
@ -801,7 +819,7 @@ public HdfsFileStatus create(String src, FsPermission masked,
+ MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
}
namesystem.checkOperation(OperationCategory.WRITE);
CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache, null);
CacheEntryWithPayload cacheEntry = getCacheEntryWithPayload(null);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return (HdfsFileStatus) cacheEntry.getPayload();
}
@ -832,8 +850,7 @@ public LastBlockWithStatus append(String src, String clientName,
+src+" for "+clientName+" at "+clientMachine);
}
namesystem.checkOperation(OperationCategory.WRITE);
CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache,
null);
CacheEntryWithPayload cacheEntry = getCacheEntryWithPayload(null);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return (LastBlockWithStatus) cacheEntry.getPayload();
}
@ -999,7 +1016,7 @@ public void updatePipeline(String clientName, ExtendedBlock oldBlock,
throws IOException {
checkNNStartup();
namesystem.checkOperation(OperationCategory.WRITE);
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
CacheEntry cacheEntry = getCacheEntry();
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
@ -1044,7 +1061,7 @@ public boolean rename(String src, String dst) throws IOException {
+ MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
}
namesystem.checkOperation(OperationCategory.WRITE);
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
CacheEntry cacheEntry = getCacheEntry();
if (cacheEntry != null && cacheEntry.isSuccess()) {
return true; // Return previous response
}
@ -1067,7 +1084,7 @@ public void concat(String trg, String[] src) throws IOException {
stateChangeLog.debug("*DIR* NameNode.concat: src path {} to" +
" target path {}", Arrays.toString(src), trg);
namesystem.checkOperation(OperationCategory.WRITE);
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
CacheEntry cacheEntry = getCacheEntry();
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
@ -1093,7 +1110,7 @@ public void rename2(String src, String dst, Options.Rename... options)
+ MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
}
namesystem.checkOperation(OperationCategory.WRITE);
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
CacheEntry cacheEntry = getCacheEntry();
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
@ -1130,7 +1147,7 @@ public boolean delete(String src, boolean recursive) throws IOException {
+ ", recursive=" + recursive);
}
namesystem.checkOperation(OperationCategory.WRITE);
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
CacheEntry cacheEntry = getCacheEntry();
if (cacheEntry != null && cacheEntry.isSuccess()) {
return true; // Return previous response
}
@ -1315,7 +1332,7 @@ public boolean restoreFailedStorage(String arg) throws IOException {
@Override // ClientProtocol
public boolean saveNamespace(long timeWindow, long txGap) throws IOException {
checkNNStartup();
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
CacheEntry cacheEntry = getCacheEntry();
if (cacheEntry != null && cacheEntry.isSuccess()) {
return true; // Return previous response
}
@ -1503,7 +1520,7 @@ public QuotaUsage getQuotaUsage(String path) throws IOException {
public void satisfyStoragePolicy(String src) throws IOException {
checkNNStartup();
namesystem.checkOperation(OperationCategory.WRITE);
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
CacheEntry cacheEntry = getCacheEntry();
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
@ -1550,7 +1567,7 @@ public void createSymlink(String target, String link, FsPermission dirPerms,
boolean createParent) throws IOException {
checkNNStartup();
namesystem.checkOperation(OperationCategory.WRITE);
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
CacheEntry cacheEntry = getCacheEntry();
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
@ -1920,34 +1937,11 @@ private void verifySoftwareVersion(DatanodeRegistration dnReg)
}
}
/**
* Get the actual client's machine.
*/
private String getClientMachine() {
if (ipProxyUsers != null) {
// Get the real user (or effective if it isn't a proxy user)
UserGroupInformation user =
UserGroupInformation.getRealUserOrSelf(Server.getRemoteUser());
if (user != null &&
ArrayUtils.contains(ipProxyUsers, user.getShortUserName())) {
CallerContext context = CallerContext.getCurrent();
if (context != null && context.isContextValid()) {
String cc = context.getContext();
// if the rpc has a caller context of "clientIp:1.2.3.4,CLI",
// return "1.2.3.4" as the client machine.
String key = CallerContext.CLIENT_IP_STR +
CallerContext.Builder.KEY_VALUE_SEPARATOR;
int posn = cc.indexOf(key);
if (posn != -1) {
posn += key.length();
int end = cc.indexOf(",", posn);
return end == -1 ? cc.substring(posn) : cc.substring(posn, end);
}
}
}
}
String clientMachine = Server.getRemoteAddress();
if (clientMachine == null) { //not a RPC client
clientMachine = "";
}
return clientMachine;
return NameNode.getClientMachine(this.ipProxyUsers);
}
@Override
@ -1967,8 +1961,7 @@ public String createSnapshot(String snapshotRoot, String snapshotName)
+ MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
}
namesystem.checkOperation(OperationCategory.WRITE);
CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache,
null);
CacheEntryWithPayload cacheEntry = getCacheEntryWithPayload(null);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return (String) cacheEntry.getPayload();
}
@ -1995,7 +1988,7 @@ public void deleteSnapshot(String snapshotRoot, String snapshotName)
}
namesystem.checkOperation(OperationCategory.WRITE);
metrics.incrDeleteSnapshotOps();
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
CacheEntry cacheEntry = getCacheEntry();
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
@ -2037,7 +2030,7 @@ public void renameSnapshot(String snapshotRoot, String snapshotOldName,
}
namesystem.checkOperation(OperationCategory.WRITE);
metrics.incrRenameSnapshotOps();
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
CacheEntry cacheEntry = getCacheEntry();
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
@ -2098,8 +2091,7 @@ public long addCacheDirective(
CacheDirectiveInfo path, EnumSet<CacheFlag> flags) throws IOException {
checkNNStartup();
namesystem.checkOperation(OperationCategory.WRITE);
CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion
(retryCache, null);
CacheEntryWithPayload cacheEntry = getCacheEntryWithPayload(null);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return (Long) cacheEntry.getPayload();
}
@ -2120,7 +2112,7 @@ public void modifyCacheDirective(
CacheDirectiveInfo directive, EnumSet<CacheFlag> flags) throws IOException {
checkNNStartup();
namesystem.checkOperation(OperationCategory.WRITE);
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
CacheEntry cacheEntry = getCacheEntry();
if (cacheEntry != null && cacheEntry.isSuccess()) {
return;
}
@ -2138,7 +2130,7 @@ public void modifyCacheDirective(
public void removeCacheDirective(long id) throws IOException {
checkNNStartup();
namesystem.checkOperation(OperationCategory.WRITE);
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
CacheEntry cacheEntry = getCacheEntry();
if (cacheEntry != null && cacheEntry.isSuccess()) {
return;
}
@ -2165,7 +2157,7 @@ public BatchedEntries<CacheDirectiveEntry> listCacheDirectives(long prevId,
public void addCachePool(CachePoolInfo info) throws IOException {
checkNNStartup();
namesystem.checkOperation(OperationCategory.WRITE);
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
CacheEntry cacheEntry = getCacheEntry();
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
@ -2182,7 +2174,7 @@ public void addCachePool(CachePoolInfo info) throws IOException {
public void modifyCachePool(CachePoolInfo info) throws IOException {
checkNNStartup();
namesystem.checkOperation(OperationCategory.WRITE);
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
CacheEntry cacheEntry = getCacheEntry();
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
@ -2199,7 +2191,7 @@ public void modifyCachePool(CachePoolInfo info) throws IOException {
public void removeCachePool(String cachePoolName) throws IOException {
checkNNStartup();
namesystem.checkOperation(OperationCategory.WRITE);
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
CacheEntry cacheEntry = getCacheEntry();
if (cacheEntry != null && cacheEntry.isSuccess()) {
return;
}
@ -2262,7 +2254,7 @@ public void createEncryptionZone(String src, String keyName)
throws IOException {
checkNNStartup();
namesystem.checkOperation(OperationCategory.WRITE);
final CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
final CacheEntry cacheEntry = getCacheEntry();
if (cacheEntry != null && cacheEntry.isSuccess()) {
return;
}
@ -2294,7 +2286,7 @@ public void reencryptEncryptionZone(final String zone,
final ReencryptAction action) throws IOException {
checkNNStartup();
namesystem.checkOperation(OperationCategory.WRITE);
final CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
final CacheEntry cacheEntry = getCacheEntry();
if (cacheEntry != null && cacheEntry.isSuccess()) {
return;
}
@ -2318,7 +2310,7 @@ public BatchedEntries<ZoneReencryptionStatus> listReencryptionStatus(
public void setErasureCodingPolicy(String src, String ecPolicyName)
throws IOException {
checkNNStartup();
final CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
final CacheEntry cacheEntry = getCacheEntry();
if (cacheEntry != null && cacheEntry.isSuccess()) {
return;
}
@ -2342,7 +2334,7 @@ public void setXAttr(String src, XAttr xAttr, EnumSet<XAttrSetFlag> flag)
throws IOException {
checkNNStartup();
namesystem.checkOperation(OperationCategory.WRITE);
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
CacheEntry cacheEntry = getCacheEntry();
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
@ -2372,7 +2364,7 @@ public List<XAttr> listXAttrs(String src) throws IOException {
public void removeXAttr(String src, XAttr xAttr) throws IOException {
checkNNStartup();
namesystem.checkOperation(OperationCategory.WRITE);
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
CacheEntry cacheEntry = getCacheEntry();
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
@ -2555,7 +2547,7 @@ public ErasureCodingPolicy getErasureCodingPolicy(String src) throws IOException
@Override // ClientProtocol
public void unsetErasureCodingPolicy(String src) throws IOException {
checkNNStartup();
final CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
final CacheEntry cacheEntry = getCacheEntry();
if (cacheEntry != null && cacheEntry.isSuccess()) {
return;
}
@ -2581,8 +2573,7 @@ public AddErasureCodingPolicyResponse[] addErasureCodingPolicies(
String operationName = "addErasureCodingPolicies";
checkNNStartup();
namesystem.checkSuperuserPrivilege(operationName);
final CacheEntryWithPayload cacheEntry =
RetryCache.waitForCompletion(retryCache, null);
final CacheEntryWithPayload cacheEntry = getCacheEntryWithPayload(null);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return (AddErasureCodingPolicyResponse[]) cacheEntry.getPayload();
}
@ -2605,7 +2596,7 @@ public void removeErasureCodingPolicy(String ecPolicyName)
String operationName = "removeErasureCodingPolicy";
checkNNStartup();
namesystem.checkSuperuserPrivilege(operationName);
final CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
final CacheEntry cacheEntry = getCacheEntry();
if (cacheEntry != null && cacheEntry.isSuccess()) {
return;
}
@ -2624,7 +2615,7 @@ public void enableErasureCodingPolicy(String ecPolicyName)
String operationName = "enableErasureCodingPolicy";
checkNNStartup();
namesystem.checkSuperuserPrivilege(operationName);
final CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
final CacheEntry cacheEntry = getCacheEntry();
if (cacheEntry != null && cacheEntry.isSuccess()) {
return;
}
@ -2643,7 +2634,7 @@ public void disableErasureCodingPolicy(String ecPolicyName)
String operationName = "disableErasureCodingPolicy";
checkNNStartup();
namesystem.checkSuperuserPrivilege(operationName);
final CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
final CacheEntry cacheEntry = getCacheEntry();
if (cacheEntry != null && cacheEntry.isSuccess()) {
return;
}