HDFS-14160. [SBN read] ObserverReadInvocationHandler should implement RpcInvocationHandler. Contributed by Konstantin V Shvachko.

This commit is contained in:
Konstantin V Shvachko 2018-12-19 12:39:57 -08:00
parent fa8550337d
commit c9d7737431
4 changed files with 32 additions and 6 deletions

View File

@ -19,7 +19,6 @@
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException; import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.lang.reflect.Proxy; import java.lang.reflect.Proxy;
@ -39,9 +38,11 @@
import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction; import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
import org.apache.hadoop.ipc.AlignmentContext; import org.apache.hadoop.ipc.AlignmentContext;
import org.apache.hadoop.ipc.Client.ConnectionId;
import org.apache.hadoop.ipc.ObserverRetryOnActiveException; import org.apache.hadoop.ipc.ObserverRetryOnActiveException;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.RpcInvocationHandler;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -239,7 +240,7 @@ private synchronized void changeProxy(NNProxyInfo<T> initial) {
* *
* Write requests are always forwarded to the active. * Write requests are always forwarded to the active.
*/ */
private class ObserverReadInvocationHandler implements InvocationHandler { private class ObserverReadInvocationHandler implements RpcInvocationHandler {
@Override @Override
public Object invoke(Object proxy, final Method method, final Object[] args) public Object invoke(Object proxy, final Method method, final Object[] args)
@ -322,6 +323,14 @@ public Object invoke(Object proxy, final Method method, final Object[] args)
lastProxy = activeProxy; lastProxy = activeProxy;
return retVal; return retVal;
} }
@Override
public void close() throws IOException {}
@Override
public ConnectionId getConnectionId() {
return RPC.getConnectionIdForProxy(getCurrentProxy().proxy);
}
} }
@Override @Override

View File

@ -165,7 +165,7 @@ public class TestFsck {
private static final String LINE_SEPARATOR = private static final String LINE_SEPARATOR =
System.getProperty("line.separator"); System.getProperty("line.separator");
static String runFsck(Configuration conf, int expectedErrCode, public static String runFsck(Configuration conf, int expectedErrCode,
boolean checkErrorCode, String... path) boolean checkErrorCode, String... path)
throws Exception { throws Exception {
ByteArrayOutputStream bStream = new ByteArrayOutputStream(); ByteArrayOutputStream bStream = new ByteArrayOutputStream();

View File

@ -176,10 +176,14 @@ DistributedFileSystem configureObserverReadFs(
MiniDFSCluster cluster, Configuration conf, MiniDFSCluster cluster, Configuration conf,
Class<P> classFPP, boolean isObserverReadEnabled) Class<P> classFPP, boolean isObserverReadEnabled)
throws IOException, URISyntaxException { throws IOException, URISyntaxException {
conf = new Configuration(conf); String logicalName = conf.get(DFSConfigKeys.DFS_NAMESERVICES);
setupHAConfiguration(cluster, conf, 0, classFPP); URI nnUri = new URI(HdfsConstants.HDFS_URI_SCHEME + "://" + logicalName);
conf.set(HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX
+ "." + logicalName, classFPP.getName());
conf.set("fs.defaultFS", nnUri.toString());
DistributedFileSystem dfs = (DistributedFileSystem) DistributedFileSystem dfs = (DistributedFileSystem)
FileSystem.get(getLogicalUri(cluster), conf); FileSystem.get(nnUri, conf);
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
P provider = (P) ((RetryInvocationHandler<?>) Proxy.getInvocationHandler( P provider = (P) ((RetryInvocationHandler<?>) Proxy.getInvocationHandler(
dfs.getClient().getNamenode())).getProxyProvider(); dfs.getClient().getNamenode())).getProxyProvider();

View File

@ -49,6 +49,7 @@
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster; import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.namenode.TestFsck;
import org.junit.After; import org.junit.After;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Before; import org.junit.Before;
@ -339,6 +340,18 @@ public void testObserverNodeBlockMissingRetry() throws Exception {
Mockito.reset(bmSpy); Mockito.reset(bmSpy);
} }
@Test
public void testFsckWithObserver() throws Exception {
setObserverRead(true);
dfs.create(testPath, (short)1).close();
assertSentTo(0);
final String result = TestFsck.runFsck(conf, 0, true, "/");
LOG.info("result=" + result);
assertTrue(result.contains("Status: HEALTHY"));
}
private void assertSentTo(int nnIdx) throws IOException { private void assertSentTo(int nnIdx) throws IOException {
assertTrue("Request was not sent to the expected namenode " + nnIdx, assertTrue("Request was not sent to the expected namenode " + nnIdx,
HATestUtil.isSentToAnyOfNameNodes(dfs, dfsCluster, nnIdx)); HATestUtil.isSentToAnyOfNameNodes(dfs, dfsCluster, nnIdx));