HDFS-14230. RBF: Throw RetriableException instead of IOException when no namenodes available. Contributed by Fei Hui.
This commit is contained in:
parent
912b90f91e
commit
7e63e37dc5
@ -46,6 +46,8 @@ public interface FederationRPCMBean {
|
|||||||
|
|
||||||
long getProxyOpRetries();
|
long getProxyOpRetries();
|
||||||
|
|
||||||
|
long getProxyOpNoNamenodes();
|
||||||
|
|
||||||
long getRouterFailureStateStoreOps();
|
long getRouterFailureStateStoreOps();
|
||||||
|
|
||||||
long getRouterFailureReadOnlyOps();
|
long getRouterFailureReadOnlyOps();
|
||||||
|
@ -60,6 +60,8 @@ public class FederationRPCMetrics implements FederationRPCMBean {
|
|||||||
private MutableCounterLong proxyOpNotImplemented;
|
private MutableCounterLong proxyOpNotImplemented;
|
||||||
@Metric("Number of operation retries")
|
@Metric("Number of operation retries")
|
||||||
private MutableCounterLong proxyOpRetries;
|
private MutableCounterLong proxyOpRetries;
|
||||||
|
@Metric("Number of operations to hit no namenodes available")
|
||||||
|
private MutableCounterLong proxyOpNoNamenodes;
|
||||||
|
|
||||||
@Metric("Failed requests due to State Store unavailable")
|
@Metric("Failed requests due to State Store unavailable")
|
||||||
private MutableCounterLong routerFailureStateStore;
|
private MutableCounterLong routerFailureStateStore;
|
||||||
@ -138,6 +140,15 @@ public long getProxyOpRetries() {
|
|||||||
return proxyOpRetries.value();
|
return proxyOpRetries.value();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void incrProxyOpNoNamenodes() {
|
||||||
|
proxyOpNoNamenodes.incr();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getProxyOpNoNamenodes() {
|
||||||
|
return proxyOpNoNamenodes.value();
|
||||||
|
}
|
||||||
|
|
||||||
public void incrRouterFailureStateStore() {
|
public void incrRouterFailureStateStore() {
|
||||||
routerFailureStateStore.incr();
|
routerFailureStateStore.incr();
|
||||||
}
|
}
|
||||||
|
@ -170,6 +170,11 @@ public void proxyOpRetries() {
|
|||||||
metrics.incrProxyOpRetries();
|
metrics.incrProxyOpRetries();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void proxyOpNoNamenodes() {
|
||||||
|
metrics.incrProxyOpNoNamenodes();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void routerFailureStateStore() {
|
public void routerFailureStateStore() {
|
||||||
metrics.incrRouterFailureStateStore();
|
metrics.incrRouterFailureStateStore();
|
||||||
|
@ -0,0 +1,33 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.federation.router;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Exception when no namenodes are available.
|
||||||
|
*/
|
||||||
|
public class NoNamenodesAvailableException extends IOException {
|
||||||
|
|
||||||
|
private static final long serialVersionUID = 1L;
|
||||||
|
|
||||||
|
public NoNamenodesAvailableException(String nsId, IOException ioe) {
|
||||||
|
super("No namenodes available under nameservice " + nsId, ioe);
|
||||||
|
}
|
||||||
|
}
|
@ -61,6 +61,7 @@
|
|||||||
import org.apache.hadoop.io.retry.RetryPolicy;
|
import org.apache.hadoop.io.retry.RetryPolicy;
|
||||||
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction.RetryDecision;
|
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction.RetryDecision;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
|
import org.apache.hadoop.ipc.RetriableException;
|
||||||
import org.apache.hadoop.ipc.StandbyException;
|
import org.apache.hadoop.ipc.StandbyException;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
@ -302,8 +303,8 @@ private static IOException toIOException(Exception e) {
|
|||||||
* @param retryCount Number of retries.
|
* @param retryCount Number of retries.
|
||||||
* @param nsId Nameservice ID.
|
* @param nsId Nameservice ID.
|
||||||
* @return Retry decision.
|
* @return Retry decision.
|
||||||
* @throws IOException Original exception if the retry policy generates one
|
* @throws NoNamenodesAvailableException Exception that the retry policy
|
||||||
* or IOException for no available namenodes.
|
* generates for no available namenodes.
|
||||||
*/
|
*/
|
||||||
private RetryDecision shouldRetry(final IOException ioe, final int retryCount,
|
private RetryDecision shouldRetry(final IOException ioe, final int retryCount,
|
||||||
final String nsId) throws IOException {
|
final String nsId) throws IOException {
|
||||||
@ -313,8 +314,7 @@ private RetryDecision shouldRetry(final IOException ioe, final int retryCount,
|
|||||||
if (retryCount == 0) {
|
if (retryCount == 0) {
|
||||||
return RetryDecision.RETRY;
|
return RetryDecision.RETRY;
|
||||||
} else {
|
} else {
|
||||||
throw new IOException("No namenode available under nameservice " + nsId,
|
throw new NoNamenodesAvailableException(nsId, ioe);
|
||||||
ioe);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -405,6 +405,14 @@ private Object invokeMethod(
|
|||||||
StandbyException se = new StandbyException(ioe.getMessage());
|
StandbyException se = new StandbyException(ioe.getMessage());
|
||||||
se.initCause(ioe);
|
se.initCause(ioe);
|
||||||
throw se;
|
throw se;
|
||||||
|
} else if (ioe instanceof NoNamenodesAvailableException) {
|
||||||
|
if (this.rpcMonitor != null) {
|
||||||
|
this.rpcMonitor.proxyOpNoNamenodes();
|
||||||
|
}
|
||||||
|
LOG.error("Can not get available namenode for {} {} error: {}",
|
||||||
|
nsId, rpcAddress, ioe.getMessage());
|
||||||
|
// Throw RetriableException so that client can retry
|
||||||
|
throw new RetriableException(ioe);
|
||||||
} else {
|
} else {
|
||||||
// Other communication error, this is a failure
|
// Other communication error, this is a failure
|
||||||
// Communication retries are handled by the retry policy
|
// Communication retries are handled by the retry policy
|
||||||
|
@ -92,6 +92,11 @@ void init(
|
|||||||
*/
|
*/
|
||||||
void proxyOpRetries();
|
void proxyOpRetries();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Failed to proxy an operation because of no namenodes available.
|
||||||
|
*/
|
||||||
|
void proxyOpNoNamenodes();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* If the Router cannot contact the State Store in an operation.
|
* If the Router cannot contact the State Store in an operation.
|
||||||
*/
|
*/
|
||||||
|
@ -48,6 +48,7 @@
|
|||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.NamenodeContext;
|
||||||
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
|
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
|
||||||
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
|
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
|
||||||
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
|
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
|
||||||
@ -374,4 +375,41 @@ public Object answer(InvocationOnMock invocation) throws Throwable {
|
|||||||
Whitebox.setInternalState(rpcClient, "connectionManager",
|
Whitebox.setInternalState(rpcClient, "connectionManager",
|
||||||
spyConnectionManager);
|
spyConnectionManager);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Switch namenodes of all hdfs name services to standby.
|
||||||
|
* @param cluster a federated HDFS cluster
|
||||||
|
*/
|
||||||
|
public static void transitionClusterNSToStandby(
|
||||||
|
StateStoreDFSCluster cluster) {
|
||||||
|
// Name services of the cluster
|
||||||
|
List<String> nameServiceList = cluster.getNameservices();
|
||||||
|
|
||||||
|
// Change namenodes of each name service to standby
|
||||||
|
for (String nameService : nameServiceList) {
|
||||||
|
List<NamenodeContext> nnList = cluster.getNamenodes(nameService);
|
||||||
|
for(NamenodeContext namenodeContext : nnList) {
|
||||||
|
cluster.switchToStandby(nameService, namenodeContext.getNamenodeId());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Switch the index namenode of all hdfs name services to active.
|
||||||
|
* @param cluster a federated HDFS cluster
|
||||||
|
* @param index the index of namenodes
|
||||||
|
*/
|
||||||
|
public static void transitionClusterNSToActive(
|
||||||
|
StateStoreDFSCluster cluster, int index) {
|
||||||
|
// Name services of the cluster
|
||||||
|
List<String> nameServiceList = cluster.getNameservices();
|
||||||
|
|
||||||
|
// Change the index namenode of each name service to active
|
||||||
|
for (String nameService : nameServiceList) {
|
||||||
|
List<NamenodeContext> listNamenodeContext =
|
||||||
|
cluster.getNamenodes(nameService);
|
||||||
|
cluster.switchToActive(nameService,
|
||||||
|
listNamenodeContext.get(index).getNamenodeId());
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -19,6 +19,8 @@
|
|||||||
|
|
||||||
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.simulateSlowNamenode;
|
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.simulateSlowNamenode;
|
||||||
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.simulateThrowExceptionRouterRpcServer;
|
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.simulateThrowExceptionRouterRpcServer;
|
||||||
|
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.transitionClusterNSToStandby;
|
||||||
|
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.transitionClusterNSToActive;
|
||||||
import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
|
import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
@ -27,6 +29,7 @@
|
|||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
@ -46,7 +49,9 @@
|
|||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
import org.apache.hadoop.ipc.StandbyException;
|
import org.apache.hadoop.ipc.StandbyException;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
|
import org.junit.Rule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.junit.rules.ExpectedException;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
@ -71,14 +76,19 @@ public void cleanup() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void setupCluster(boolean overloadControl) throws Exception {
|
@Rule
|
||||||
|
public ExpectedException exceptionRule = ExpectedException.none();
|
||||||
|
|
||||||
|
private void setupCluster(boolean overloadControl, boolean ha)
|
||||||
|
throws Exception {
|
||||||
// Build and start a federated cluster
|
// Build and start a federated cluster
|
||||||
cluster = new StateStoreDFSCluster(false, 2);
|
cluster = new StateStoreDFSCluster(ha, 2);
|
||||||
Configuration routerConf = new RouterConfigBuilder()
|
Configuration routerConf = new RouterConfigBuilder()
|
||||||
.stateStore()
|
.stateStore()
|
||||||
.metrics()
|
.metrics()
|
||||||
.admin()
|
.admin()
|
||||||
.rpc()
|
.rpc()
|
||||||
|
.heartbeat()
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
// Reduce the number of RPC clients threads to overload the Router easy
|
// Reduce the number of RPC clients threads to overload the Router easy
|
||||||
@ -98,7 +108,7 @@ private void setupCluster(boolean overloadControl) throws Exception {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testWithoutOverloadControl() throws Exception {
|
public void testWithoutOverloadControl() throws Exception {
|
||||||
setupCluster(false);
|
setupCluster(false, false);
|
||||||
|
|
||||||
// Nobody should get overloaded
|
// Nobody should get overloaded
|
||||||
testOverloaded(0);
|
testOverloaded(0);
|
||||||
@ -121,7 +131,7 @@ public void testWithoutOverloadControl() throws Exception {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testOverloadControl() throws Exception {
|
public void testOverloadControl() throws Exception {
|
||||||
setupCluster(true);
|
setupCluster(true, false);
|
||||||
|
|
||||||
List<RouterContext> routers = cluster.getRouters();
|
List<RouterContext> routers = cluster.getRouters();
|
||||||
FederationRPCMetrics rpcMetrics0 =
|
FederationRPCMetrics rpcMetrics0 =
|
||||||
@ -244,7 +254,7 @@ public void run() {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testConnectionNullException() throws Exception {
|
public void testConnectionNullException() throws Exception {
|
||||||
setupCluster(false);
|
setupCluster(false, false);
|
||||||
|
|
||||||
// Choose 1st router
|
// Choose 1st router
|
||||||
RouterContext routerContext = cluster.getRouters().get(0);
|
RouterContext routerContext = cluster.getRouters().get(0);
|
||||||
@ -280,4 +290,70 @@ public void testConnectionNullException() throws Exception {
|
|||||||
assertEquals(originalRouter1Failures,
|
assertEquals(originalRouter1Failures,
|
||||||
rpcMetrics1.getProxyOpFailureCommunicate());
|
rpcMetrics1.getProxyOpFailureCommunicate());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* When failover occurs, no namenodes are available within a short time.
|
||||||
|
* Client will success after some retries.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testNoNamenodesAvailable() throws Exception{
|
||||||
|
setupCluster(false, true);
|
||||||
|
|
||||||
|
transitionClusterNSToStandby(cluster);
|
||||||
|
|
||||||
|
Configuration conf = cluster.getRouterClientConf();
|
||||||
|
// Set dfs.client.failover.random.order false, to pick 1st router at first
|
||||||
|
conf.setBoolean("dfs.client.failover.random.order", false);
|
||||||
|
|
||||||
|
// Retries is 3 (see FailoverOnNetworkExceptionRetry#shouldRetry, will fail
|
||||||
|
// when reties > max.attempts), so total access is 4.
|
||||||
|
conf.setInt("dfs.client.retry.max.attempts", 2);
|
||||||
|
DFSClient routerClient = new DFSClient(new URI("hdfs://fed"), conf);
|
||||||
|
|
||||||
|
// Get router0 metrics
|
||||||
|
FederationRPCMetrics rpcMetrics0 = cluster.getRouters().get(0)
|
||||||
|
.getRouter().getRpcServer().getRPCMetrics();
|
||||||
|
// Get router1 metrics
|
||||||
|
FederationRPCMetrics rpcMetrics1 = cluster.getRouters().get(1)
|
||||||
|
.getRouter().getRpcServer().getRPCMetrics();
|
||||||
|
|
||||||
|
// Original failures
|
||||||
|
long originalRouter0Failures = rpcMetrics0.getProxyOpNoNamenodes();
|
||||||
|
long originalRouter1Failures = rpcMetrics1.getProxyOpNoNamenodes();
|
||||||
|
|
||||||
|
// GetFileInfo will throw Exception
|
||||||
|
String exceptionMessage = "org.apache.hadoop.hdfs.server.federation."
|
||||||
|
+ "router.NoNamenodesAvailableException: No namenodes available "
|
||||||
|
+ "under nameservice ns0";
|
||||||
|
exceptionRule.expect(RemoteException.class);
|
||||||
|
exceptionRule.expectMessage(exceptionMessage);
|
||||||
|
routerClient.getFileInfo("/");
|
||||||
|
|
||||||
|
// Router 0 failures will increase
|
||||||
|
assertEquals(originalRouter0Failures + 4,
|
||||||
|
rpcMetrics0.getProxyOpNoNamenodes());
|
||||||
|
// Router 1 failures do not change
|
||||||
|
assertEquals(originalRouter1Failures,
|
||||||
|
rpcMetrics1.getProxyOpNoNamenodes());
|
||||||
|
|
||||||
|
// Make name services available
|
||||||
|
transitionClusterNSToActive(cluster, 0);
|
||||||
|
for (RouterContext routerContext : cluster.getRouters()) {
|
||||||
|
// Manually trigger the heartbeat
|
||||||
|
Collection<NamenodeHeartbeatService> heartbeatServices = routerContext
|
||||||
|
.getRouter().getNamenodeHearbeatServices();
|
||||||
|
for (NamenodeHeartbeatService service : heartbeatServices) {
|
||||||
|
service.periodicInvoke();
|
||||||
|
}
|
||||||
|
// Update service cache
|
||||||
|
routerContext.getRouter().getStateStore().refreshCaches(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
originalRouter0Failures = rpcMetrics0.getProxyOpNoNamenodes();
|
||||||
|
|
||||||
|
// RPC call must be successful
|
||||||
|
routerClient.getFileInfo("/");
|
||||||
|
// Router 0 failures do not change
|
||||||
|
assertEquals(originalRouter0Failures, rpcMetrics0.getProxyOpNoNamenodes());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -133,7 +133,7 @@ public void testRetryWhenAllNameServiceDown() throws Exception {
|
|||||||
} catch (RemoteException e) {
|
} catch (RemoteException e) {
|
||||||
String ns0 = cluster.getNameservices().get(0);
|
String ns0 = cluster.getNameservices().get(0);
|
||||||
assertExceptionContains(
|
assertExceptionContains(
|
||||||
"No namenode available under nameservice " + ns0, e);
|
"No namenodes available under nameservice " + ns0, e);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Verify the retry times, it should only retry one time.
|
// Verify the retry times, it should only retry one time.
|
||||||
|
Loading…
Reference in New Issue
Block a user