HDFS-13924. [SBN read] Handle BlockMissingException when reading from observer. Contributed by Chao Sun.
This commit is contained in:
parent
b6f20c36c2
commit
8c49135078
@ -0,0 +1,35 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.ipc;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Thrown by a remote ObserverNode indicating the operation has failed and the
|
||||||
|
* client should retry active namenode directly (instead of retry other
|
||||||
|
* ObserverNodes).
|
||||||
|
*/
|
||||||
|
@InterfaceStability.Evolving
|
||||||
|
public class ObserverRetryOnActiveException extends IOException {
|
||||||
|
static final long serialVersionUID = 1L;
|
||||||
|
public ObserverRetryOnActiveException(String msg) {
|
||||||
|
super(msg);
|
||||||
|
}
|
||||||
|
}
|
@ -37,7 +37,9 @@
|
|||||||
import org.apache.hadoop.io.retry.RetryPolicy;
|
import org.apache.hadoop.io.retry.RetryPolicy;
|
||||||
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
|
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
|
||||||
import org.apache.hadoop.ipc.AlignmentContext;
|
import org.apache.hadoop.ipc.AlignmentContext;
|
||||||
|
import org.apache.hadoop.ipc.ObserverRetryOnActiveException;
|
||||||
import org.apache.hadoop.ipc.RPC;
|
import org.apache.hadoop.ipc.RPC;
|
||||||
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
@ -263,6 +265,16 @@ public Object invoke(Object proxy, final Method method, final Object[] args)
|
|||||||
throw ite.getCause();
|
throw ite.getCause();
|
||||||
}
|
}
|
||||||
Exception e = (Exception) ite.getCause();
|
Exception e = (Exception) ite.getCause();
|
||||||
|
if (e instanceof RemoteException) {
|
||||||
|
RemoteException re = (RemoteException) e;
|
||||||
|
Exception unwrapped = re.unwrapRemoteException(
|
||||||
|
ObserverRetryOnActiveException.class);
|
||||||
|
if (unwrapped instanceof ObserverRetryOnActiveException) {
|
||||||
|
LOG.info("Encountered ObserverRetryOnActiveException from {}." +
|
||||||
|
" Retry active namenode directly.", current.proxyInfo);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
RetryAction retryInfo = observerRetryPolicy.shouldRetry(e, 0, 0,
|
RetryAction retryInfo = observerRetryPolicy.shouldRetry(e, 0, 0,
|
||||||
method.isAnnotationPresent(Idempotent.class)
|
method.isAnnotationPresent(Idempotent.class)
|
||||||
|| method.isAnnotationPresent(AtMostOnce.class));
|
|| method.isAnnotationPresent(AtMostOnce.class));
|
||||||
|
@ -288,6 +288,7 @@
|
|||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.ipc.CallerContext;
|
import org.apache.hadoop.ipc.CallerContext;
|
||||||
|
import org.apache.hadoop.ipc.ObserverRetryOnActiveException;
|
||||||
import org.apache.hadoop.ipc.RetriableException;
|
import org.apache.hadoop.ipc.RetriableException;
|
||||||
import org.apache.hadoop.ipc.RetryCache;
|
import org.apache.hadoop.ipc.RetryCache;
|
||||||
import org.apache.hadoop.ipc.Server;
|
import org.apache.hadoop.ipc.Server;
|
||||||
@ -1976,6 +1977,14 @@ LocatedBlocks getBlockLocations(String clientMachine, String srcArg,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} else if (haEnabled && haContext != null &&
|
||||||
|
haContext.getState().getServiceState() == OBSERVER) {
|
||||||
|
for (LocatedBlock b : res.blocks.getLocatedBlocks()) {
|
||||||
|
if (b.getLocations() == null || b.getLocations().length == 0) {
|
||||||
|
throw new ObserverRetryOnActiveException("Zero blocklocations for "
|
||||||
|
+ srcArg);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} catch (AccessControlException e) {
|
} catch (AccessControlException e) {
|
||||||
logAuditEvent(false, operationName, srcArg);
|
logAuditEvent(false, operationName, srcArg);
|
||||||
|
@ -283,12 +283,11 @@ public void testObserverNodeSafeModeWithBlockLocations() throws Exception {
|
|||||||
|
|
||||||
// Mock block manager for observer to generate some fake blocks which
|
// Mock block manager for observer to generate some fake blocks which
|
||||||
// will trigger the (retriable) safe mode exception.
|
// will trigger the (retriable) safe mode exception.
|
||||||
final DatanodeInfo[] empty = {};
|
|
||||||
BlockManager bmSpy =
|
BlockManager bmSpy =
|
||||||
NameNodeAdapter.spyOnBlockManager(dfsCluster.getNameNode(2));
|
NameNodeAdapter.spyOnBlockManager(dfsCluster.getNameNode(2));
|
||||||
doAnswer((invocation) -> {
|
doAnswer((invocation) -> {
|
||||||
ExtendedBlock b = new ExtendedBlock("fake-pool", new Block(12345L));
|
ExtendedBlock b = new ExtendedBlock("fake-pool", new Block(12345L));
|
||||||
LocatedBlock fakeBlock = new LocatedBlock(b, empty);
|
LocatedBlock fakeBlock = new LocatedBlock(b, DatanodeInfo.EMPTY_ARRAY);
|
||||||
List<LocatedBlock> fakeBlocks = new ArrayList<>();
|
List<LocatedBlock> fakeBlocks = new ArrayList<>();
|
||||||
fakeBlocks.add(fakeBlock);
|
fakeBlocks.add(fakeBlock);
|
||||||
return new LocatedBlocks(0, false, fakeBlocks, null, true, null, null);
|
return new LocatedBlocks(0, false, fakeBlocks, null, true, null, null);
|
||||||
@ -300,10 +299,42 @@ public void testObserverNodeSafeModeWithBlockLocations() throws Exception {
|
|||||||
dfs.open(testPath).close();
|
dfs.open(testPath).close();
|
||||||
assertSentTo(0);
|
assertSentTo(0);
|
||||||
|
|
||||||
|
Mockito.reset(bmSpy);
|
||||||
|
|
||||||
// Remove safe mode on observer, request should still go to it.
|
// Remove safe mode on observer, request should still go to it.
|
||||||
dfsCluster.getFileSystem(2).setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
|
dfsCluster.getFileSystem(2).setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
|
||||||
dfs.open(testPath).close();
|
dfs.open(testPath).close();
|
||||||
assertSentTo(2);
|
assertSentTo(2);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testObserverNodeBlockMissingRetry() throws Exception {
|
||||||
|
setObserverRead(true);
|
||||||
|
|
||||||
|
dfs.create(testPath, (short)1).close();
|
||||||
|
assertSentTo(0);
|
||||||
|
|
||||||
|
dfsCluster.rollEditLogAndTail(0);
|
||||||
|
|
||||||
|
// Mock block manager for observer to generate some fake blocks which
|
||||||
|
// will trigger the block missing exception.
|
||||||
|
|
||||||
|
BlockManager bmSpy = NameNodeAdapter
|
||||||
|
.spyOnBlockManager(dfsCluster.getNameNode(2));
|
||||||
|
doAnswer((invocation) -> {
|
||||||
|
List<LocatedBlock> fakeBlocks = new ArrayList<>();
|
||||||
|
// Remove the datanode info for the only block so it will throw
|
||||||
|
// BlockMissingException and retry.
|
||||||
|
ExtendedBlock b = new ExtendedBlock("fake-pool", new Block(12345L));
|
||||||
|
LocatedBlock fakeBlock = new LocatedBlock(b, DatanodeInfo.EMPTY_ARRAY);
|
||||||
|
fakeBlocks.add(fakeBlock);
|
||||||
|
return new LocatedBlocks(0, false, fakeBlocks, null, true, null, null);
|
||||||
|
}).when(bmSpy).createLocatedBlocks(Mockito.any(), anyLong(),
|
||||||
|
anyBoolean(), anyLong(), anyLong(), anyBoolean(), anyBoolean(),
|
||||||
|
Mockito.any(), Mockito.any());
|
||||||
|
|
||||||
|
dfs.open(testPath);
|
||||||
|
assertSentTo(0);
|
||||||
|
|
||||||
Mockito.reset(bmSpy);
|
Mockito.reset(bmSpy);
|
||||||
}
|
}
|
||||||
|
@ -32,6 +32,7 @@
|
|||||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
|
import org.apache.hadoop.ipc.ObserverRetryOnActiveException;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
import org.apache.hadoop.ipc.StandbyException;
|
import org.apache.hadoop.ipc.StandbyException;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
@ -279,6 +280,26 @@ public void testSingleObserverToStandby() throws Exception {
|
|||||||
assertHandledBy(1);
|
assertHandledBy(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testObserverRetriableException() throws Exception {
|
||||||
|
setupProxyProvider(3);
|
||||||
|
namenodeAnswers[0].setActiveState();
|
||||||
|
namenodeAnswers[1].setObserverState();
|
||||||
|
namenodeAnswers[2].setObserverState();
|
||||||
|
|
||||||
|
// Set the first observer to throw "ObserverRetryOnActiveException" so that
|
||||||
|
// the request should skip the second observer and be served by the active.
|
||||||
|
namenodeAnswers[1].setRetryActive(true);
|
||||||
|
|
||||||
|
doRead();
|
||||||
|
assertHandledBy(0);
|
||||||
|
|
||||||
|
namenodeAnswers[1].setRetryActive(false);
|
||||||
|
|
||||||
|
doRead();
|
||||||
|
assertHandledBy(1);
|
||||||
|
}
|
||||||
|
|
||||||
private void doRead() throws Exception {
|
private void doRead() throws Exception {
|
||||||
doRead(proxyProvider.getProxy().proxy);
|
doRead(proxyProvider.getProxy().proxy);
|
||||||
}
|
}
|
||||||
@ -310,6 +331,8 @@ private static void doRead(ClientProtocol client) throws Exception {
|
|||||||
private static class NameNodeAnswer {
|
private static class NameNodeAnswer {
|
||||||
|
|
||||||
private volatile boolean unreachable = false;
|
private volatile boolean unreachable = false;
|
||||||
|
private volatile boolean retryActive = false;
|
||||||
|
|
||||||
// Standby state by default
|
// Standby state by default
|
||||||
private volatile boolean allowWrites = false;
|
private volatile boolean allowWrites = false;
|
||||||
private volatile boolean allowReads = false;
|
private volatile boolean allowReads = false;
|
||||||
@ -340,6 +363,12 @@ public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
|
|||||||
if (unreachable) {
|
if (unreachable) {
|
||||||
throw new IOException("Unavailable");
|
throw new IOException("Unavailable");
|
||||||
}
|
}
|
||||||
|
if (retryActive) {
|
||||||
|
throw new RemoteException(
|
||||||
|
ObserverRetryOnActiveException.class.getCanonicalName(),
|
||||||
|
"Try active!"
|
||||||
|
);
|
||||||
|
}
|
||||||
switch (invocationOnMock.getMethod().getName()) {
|
switch (invocationOnMock.getMethod().getName()) {
|
||||||
case "reportBadBlocks":
|
case "reportBadBlocks":
|
||||||
if (!allowWrites) {
|
if (!allowWrites) {
|
||||||
@ -379,6 +408,9 @@ void setObserverState() {
|
|||||||
allowWrites = false;
|
allowWrites = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void setRetryActive(boolean shouldRetryActive) {
|
||||||
|
retryActive = shouldRetryActive;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user