diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/LossyRetryInvocationHandler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/LossyRetryInvocationHandler.java new file mode 100644 index 0000000000..df5895553a --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/LossyRetryInvocationHandler.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.retry; + +import java.lang.reflect.Method; +import java.net.UnknownHostException; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * A dummy invocation handler extending RetryInvocationHandler. It drops the + * first N number of responses. This invocation handler is only used for testing. + */ +@InterfaceAudience.Private +public class LossyRetryInvocationHandler extends RetryInvocationHandler { + private final int numToDrop; + private static final ThreadLocal RetryCount = + new ThreadLocal(); + + public LossyRetryInvocationHandler(int numToDrop, + FailoverProxyProvider proxyProvider, RetryPolicy retryPolicy) { + super(proxyProvider, retryPolicy); + this.numToDrop = numToDrop; + } + + @Override + public Object invoke(Object proxy, Method method, Object[] args) + throws Throwable { + RetryCount.set(0); + return super.invoke(proxy, method, args); + } + + @Override + protected Object invokeMethod(Method method, Object[] args) throws Throwable { + Object result = super.invokeMethod(method, args); + int retryCount = RetryCount.get(); + if (retryCount < this.numToDrop) { + RetryCount.set(++retryCount); + LOG.info("Drop the response. Current retryCount == " + retryCount); + throw new UnknownHostException("Fake Exception"); + } else { + LOG.info("retryCount == " + retryCount + + ". It's time to normally process the response"); + return result; + } + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java index 974bac91eb..51dd46a8f9 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java @@ -63,7 +63,7 @@ protected RetryInvocationHandler(FailoverProxyProvider proxyProvider, this(proxyProvider, retryPolicy, Collections.emptyMap()); } - RetryInvocationHandler(FailoverProxyProvider proxyProvider, + protected RetryInvocationHandler(FailoverProxyProvider proxyProvider, RetryPolicy defaultPolicy, Map methodNameToPolicyMap) { this.proxyProvider = proxyProvider; diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index dd445c2fd9..dd86e0e4e4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -244,6 +244,9 @@ Release 2.3.0 - UNRELEASED NEW FEATURES + HDFS-5118. Provide testing support for DFSClient to drop RPC responses. + (jing9) + IMPROVEMENTS HDFS-4657. Limit the number of blocks logged by the NN after a block diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 511df17b47..d1cc784891 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -27,6 +27,9 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_RETRIES_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_READS; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_WRITES; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_READAHEAD; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT; @@ -44,9 +47,6 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_READS; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_WRITES; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_READAHEAD; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT; @@ -100,6 +100,7 @@ import org.apache.hadoop.fs.Options; import org.apache.hadoop.fs.Options.ChecksumOpt; import org.apache.hadoop.fs.ParentNotDirectoryException; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.UnresolvedLinkException; import org.apache.hadoop.fs.VolumeId; import org.apache.hadoop.fs.permission.FsPermission; @@ -113,13 +114,13 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata; import org.apache.hadoop.hdfs.protocol.HdfsConstants; -import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException; +import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException; import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus; import org.apache.hadoop.hdfs.protocol.UnresolvedPathException; @@ -144,6 +145,7 @@ import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.MD5Hash; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.retry.LossyRetryInvocationHandler; import org.apache.hadoop.ipc.Client; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RemoteException; @@ -451,7 +453,11 @@ public DFSClient(URI nameNodeUri, Configuration conf, /** * Create a new DFSClient connected to the given nameNodeUri or rpcNamenode. - * Exactly one of nameNodeUri or rpcNamenode must be null. + * If HA is enabled and a positive value is set for + * {@link DFSConfigKeys#DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY} in the + * configuration, the DFSClient will use {@link LossyRetryInvocationHandler} + * as its RetryInvocationHandler. Otherwise one of nameNodeUri or rpcNamenode + * must be null. */ @VisibleForTesting public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode, @@ -475,7 +481,20 @@ public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode, this.clientName = "DFSClient_" + dfsClientConf.taskId + "_" + DFSUtil.getRandom().nextInt() + "_" + Thread.currentThread().getId(); - if (rpcNamenode != null) { + int numResponseToDrop = conf.getInt( + DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY, + DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_DEFAULT); + if (numResponseToDrop > 0) { + // This case is used for testing. + LOG.warn(DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY + + " is set to " + numResponseToDrop + + ", this hacked client will proactively drop responses"); + NameNodeProxies.ProxyAndInfo proxyInfo = NameNodeProxies + .createProxyWithLossyRetryHandler(conf, nameNodeUri, + ClientProtocol.class, numResponseToDrop); + this.dtService = proxyInfo.getDelegationTokenService(); + this.namenode = proxyInfo.getProxy(); + } else if (rpcNamenode != null) { // This case is used for testing. Preconditions.checkArgument(nameNodeUri == null); this.namenode = rpcNamenode; @@ -514,7 +533,7 @@ public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode, this.defaultWriteCachingStrategy = new CachingStrategy(writeDropBehind, readahead); } - + /** * Return the socket addresses to use with each configured * local interface. Local interfaces may be specified by IP diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index a66ec93961..b4d67ca19d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -497,6 +497,12 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final long DFS_NAMENODE_RETRY_CACHE_EXPIRYTIME_MILLIS_DEFAULT = 600000; // 10 minutes public static final String DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_KEY = "dfs.namenode.retrycache.heap.percent"; public static final float DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_DEFAULT = 0.03f; + + // The number of NN response dropped by client proactively in each RPC call. + // For testing NN retry cache, we can set this property with positive value. + public static final String DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY = "dfs.client.test.drop.namenode.response.number"; + public static final int DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_DEFAULT = 0; + // Hidden configuration undocumented in hdfs-site. xml // Timeout to wait for block receiver and responder thread to stop diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java index eb745b8bb7..41dac6a80f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java @@ -17,10 +17,18 @@ */ package org.apache.hadoop.hdfs; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_KEY; import java.io.IOException; import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Proxy; import java.net.InetSocketAddress; import java.net.URI; import java.util.HashMap; @@ -48,6 +56,7 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.io.retry.DefaultFailoverProxyProvider; import org.apache.hadoop.io.retry.FailoverProxyProvider; +import org.apache.hadoop.io.retry.LossyRetryInvocationHandler; import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.io.retry.RetryProxy; @@ -144,6 +153,61 @@ public static ProxyAndInfo createProxy(Configuration conf, return new ProxyAndInfo(proxy, dtService); } } + + /** + * Generate a dummy namenode proxy instance that utilizes our hacked + * {@link LossyRetryInvocationHandler}. Proxy instance generated using this + * method will proactively drop RPC responses. Currently this method only + * support HA setup. IllegalStateException will be thrown if the given + * configuration is not for HA. + * + * @param config the configuration containing the required IPC + * properties, client failover configurations, etc. + * @param nameNodeUri the URI pointing either to a specific NameNode + * or to a logical nameservice. + * @param xface the IPC interface which should be created + * @param numResponseToDrop The number of responses to drop for each RPC call + * @return an object containing both the proxy and the associated + * delegation token service it corresponds to + * @throws IOException if there is an error creating the proxy + */ + @SuppressWarnings("unchecked") + public static ProxyAndInfo createProxyWithLossyRetryHandler( + Configuration config, URI nameNodeUri, Class xface, + int numResponseToDrop) throws IOException { + Preconditions.checkArgument(numResponseToDrop > 0); + Class> failoverProxyProviderClass = + getFailoverProxyProviderClass(config, nameNodeUri, xface); + if (failoverProxyProviderClass != null) { // HA case + FailoverProxyProvider failoverProxyProvider = + createFailoverProxyProvider(config, failoverProxyProviderClass, + xface, nameNodeUri); + int delay = config.getInt( + DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_KEY, + DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT); + int maxCap = config.getInt( + DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_KEY, + DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_DEFAULT); + int maxFailoverAttempts = config.getInt( + DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY, + DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT); + InvocationHandler dummyHandler = new LossyRetryInvocationHandler( + numResponseToDrop, failoverProxyProvider, + RetryPolicies.failoverOnNetworkException( + RetryPolicies.TRY_ONCE_THEN_FAIL, + Math.max(numResponseToDrop + 1, maxFailoverAttempts), delay, + maxCap)); + + T proxy = (T) Proxy.newProxyInstance( + failoverProxyProvider.getInterface().getClassLoader(), + new Class[] { xface }, dummyHandler); + Text dtService = HAUtil.buildTokenServiceForLogicalUri(nameNodeUri); + return new ProxyAndInfo(proxy, dtService); + } else { + throw new IllegalStateException("Currently creating proxy using " + + "LossyRetryInvocationHandler requires NN HA setup"); + } + } /** * Creates an explicitly non-HA-enabled proxy object. Most of the time you