From 8c491350789a676cc8fbefab6414773054b9b495 Mon Sep 17 00:00:00 2001 From: Chao Sun Date: Tue, 23 Oct 2018 22:36:23 -0700 Subject: [PATCH] HDFS-13924. [SBN read] Handle BlockMissingException when reading from observer. Contributed by Chao Sun. --- .../ipc/ObserverRetryOnActiveException.java | 35 +++++++++++++++++++ .../ha/ObserverReadProxyProvider.java | 12 +++++++ .../hdfs/server/namenode/FSNamesystem.java | 9 +++++ .../server/namenode/ha/TestObserverNode.java | 35 +++++++++++++++++-- .../ha/TestObserverReadProxyProvider.java | 32 +++++++++++++++++ 5 files changed, 121 insertions(+), 2 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ObserverRetryOnActiveException.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ObserverRetryOnActiveException.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ObserverRetryOnActiveException.java new file mode 100644 index 0000000000..7e67b0cdf7 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ObserverRetryOnActiveException.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ipc; + +import org.apache.hadoop.classification.InterfaceStability; + +import java.io.IOException; + +/** + * Thrown by a remote ObserverNode indicating the operation has failed and the + * client should retry active namenode directly (instead of retry other + * ObserverNodes). + */ +@InterfaceStability.Evolving +public class ObserverRetryOnActiveException extends IOException { + static final long serialVersionUID = 1L; + public ObserverRetryOnActiveException(String msg) { + super(msg); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java index 690ee0bf64..87ca718f89 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java @@ -37,7 +37,9 @@ import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.io.retry.RetryPolicy.RetryAction; import org.apache.hadoop.ipc.AlignmentContext; +import org.apache.hadoop.ipc.ObserverRetryOnActiveException; import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.RemoteException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -263,6 +265,16 @@ public Object invoke(Object proxy, final Method method, final Object[] args) throw ite.getCause(); } Exception e = (Exception) ite.getCause(); + if (e instanceof RemoteException) { + RemoteException re = (RemoteException) e; + Exception unwrapped = re.unwrapRemoteException( + ObserverRetryOnActiveException.class); + if (unwrapped instanceof ObserverRetryOnActiveException) { + LOG.info("Encountered ObserverRetryOnActiveException from {}." + + " Retry active namenode directly.", current.proxyInfo); + break; + } + } RetryAction retryInfo = observerRetryPolicy.shouldRetry(e, 0, 0, method.isAnnotationPresent(Idempotent.class) || method.isAnnotationPresent(AtMostOnce.class)); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 16f3983acc..40e42c4d1e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -288,6 +288,7 @@ import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.CallerContext; +import org.apache.hadoop.ipc.ObserverRetryOnActiveException; import org.apache.hadoop.ipc.RetriableException; import org.apache.hadoop.ipc.RetryCache; import org.apache.hadoop.ipc.Server; @@ -1976,6 +1977,14 @@ LocatedBlocks getBlockLocations(String clientMachine, String srcArg, } } } + } else if (haEnabled && haContext != null && + haContext.getState().getServiceState() == OBSERVER) { + for (LocatedBlock b : res.blocks.getLocatedBlocks()) { + if (b.getLocations() == null || b.getLocations().length == 0) { + throw new ObserverRetryOnActiveException("Zero blocklocations for " + + srcArg); + } + } } } catch (AccessControlException e) { logAuditEvent(false, operationName, srcArg); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java index b18c5b8e1a..d8e0cfa059 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java @@ -283,12 +283,11 @@ public void testObserverNodeSafeModeWithBlockLocations() throws Exception { // Mock block manager for observer to generate some fake blocks which // will trigger the (retriable) safe mode exception. - final DatanodeInfo[] empty = {}; BlockManager bmSpy = NameNodeAdapter.spyOnBlockManager(dfsCluster.getNameNode(2)); doAnswer((invocation) -> { ExtendedBlock b = new ExtendedBlock("fake-pool", new Block(12345L)); - LocatedBlock fakeBlock = new LocatedBlock(b, empty); + LocatedBlock fakeBlock = new LocatedBlock(b, DatanodeInfo.EMPTY_ARRAY); List fakeBlocks = new ArrayList<>(); fakeBlocks.add(fakeBlock); return new LocatedBlocks(0, false, fakeBlocks, null, true, null, null); @@ -300,10 +299,42 @@ public void testObserverNodeSafeModeWithBlockLocations() throws Exception { dfs.open(testPath).close(); assertSentTo(0); + Mockito.reset(bmSpy); + // Remove safe mode on observer, request should still go to it. dfsCluster.getFileSystem(2).setSafeMode(SafeModeAction.SAFEMODE_LEAVE); dfs.open(testPath).close(); assertSentTo(2); + } + + @Test + public void testObserverNodeBlockMissingRetry() throws Exception { + setObserverRead(true); + + dfs.create(testPath, (short)1).close(); + assertSentTo(0); + + dfsCluster.rollEditLogAndTail(0); + + // Mock block manager for observer to generate some fake blocks which + // will trigger the block missing exception. + + BlockManager bmSpy = NameNodeAdapter + .spyOnBlockManager(dfsCluster.getNameNode(2)); + doAnswer((invocation) -> { + List fakeBlocks = new ArrayList<>(); + // Remove the datanode info for the only block so it will throw + // BlockMissingException and retry. + ExtendedBlock b = new ExtendedBlock("fake-pool", new Block(12345L)); + LocatedBlock fakeBlock = new LocatedBlock(b, DatanodeInfo.EMPTY_ARRAY); + fakeBlocks.add(fakeBlock); + return new LocatedBlocks(0, false, fakeBlocks, null, true, null, null); + }).when(bmSpy).createLocatedBlocks(Mockito.any(), anyLong(), + anyBoolean(), anyLong(), anyLong(), anyBoolean(), anyBoolean(), + Mockito.any(), Mockito.any()); + + dfs.open(testPath); + assertSentTo(0); Mockito.reset(bmSpy); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java index 3f56c96cd4..dfd8488301 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.ipc.ObserverRetryOnActiveException; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.security.UserGroupInformation; @@ -279,6 +280,26 @@ public void testSingleObserverToStandby() throws Exception { assertHandledBy(1); } + @Test + public void testObserverRetriableException() throws Exception { + setupProxyProvider(3); + namenodeAnswers[0].setActiveState(); + namenodeAnswers[1].setObserverState(); + namenodeAnswers[2].setObserverState(); + + // Set the first observer to throw "ObserverRetryOnActiveException" so that + // the request should skip the second observer and be served by the active. + namenodeAnswers[1].setRetryActive(true); + + doRead(); + assertHandledBy(0); + + namenodeAnswers[1].setRetryActive(false); + + doRead(); + assertHandledBy(1); + } + private void doRead() throws Exception { doRead(proxyProvider.getProxy().proxy); } @@ -310,6 +331,8 @@ private static void doRead(ClientProtocol client) throws Exception { private static class NameNodeAnswer { private volatile boolean unreachable = false; + private volatile boolean retryActive = false; + // Standby state by default private volatile boolean allowWrites = false; private volatile boolean allowReads = false; @@ -340,6 +363,12 @@ public Void answer(InvocationOnMock invocationOnMock) throws Throwable { if (unreachable) { throw new IOException("Unavailable"); } + if (retryActive) { + throw new RemoteException( + ObserverRetryOnActiveException.class.getCanonicalName(), + "Try active!" + ); + } switch (invocationOnMock.getMethod().getName()) { case "reportBadBlocks": if (!allowWrites) { @@ -379,6 +408,9 @@ void setObserverState() { allowWrites = false; } + void setRetryActive(boolean shouldRetryActive) { + retryActive = shouldRetryActive; + } } }