From 55b3a718e95e62cdd01789050376b36d8e6a0f20 Mon Sep 17 00:00:00 2001 From: Erik Krogen Date: Fri, 8 Mar 2019 10:35:31 -0800 Subject: [PATCH] HDFS-14211. [SBN Read]. Add a configurable flag to enable always-msync mode to ObserverReadProxyProvider. Contributed by Erik Krogen. --- .../ha/ObserverReadProxyProvider.java | 64 +++++++++++++++++++ .../src/site/markdown/ObserverNameNode.md | 48 +++++++++++++- .../ha/TestConsistentReadsObserver.java | 56 +++++++++++++--- 3 files changed, 156 insertions(+), 12 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java index a17c640402..b4130d714b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java @@ -23,6 +23,7 @@ import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.net.URI; +import java.util.concurrent.TimeUnit; import java.util.List; import org.apache.hadoop.classification.InterfaceAudience; @@ -43,6 +44,7 @@ import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RpcInvocationHandler; +import org.apache.hadoop.util.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,6 +70,12 @@ public class ObserverReadProxyProvider private static final Logger LOG = LoggerFactory.getLogger( ObserverReadProxyProvider.class); + /** Configuration key for {@link #autoMsyncPeriodMs}. */ + static final String AUTO_MSYNC_PERIOD_KEY_PREFIX = + HdfsClientConfigKeys.Failover.PREFIX + "observer.auto-msync-period"; + /** Auto-msync disabled by default. */ + static final long AUTO_MSYNC_PERIOD_DEFAULT = -1; + /** Client-side context for syncing with the NameNode server side. */ private final AlignmentContext alignmentContext; @@ -87,6 +95,24 @@ public class ObserverReadProxyProvider */ private boolean observerReadEnabled; + /** + * This adjusts how frequently this proxy provider should auto-msync to the + * Active NameNode, automatically performing an msync() call to the active + * to fetch the current transaction ID before submitting read requests to + * observer nodes. See HDFS-14211 for more description of this feature. + * If this is below 0, never auto-msync. If this is 0, perform an msync on + * every read operation. If this is above 0, perform an msync after this many + * ms have elapsed since the last msync. + */ + private final long autoMsyncPeriodMs; + + /** + * The time, in millisecond epoch, that the last msync operation was + * performed. This includes any implicit msync (any operation which is + * serviced by the Active NameNode). + */ + private volatile long lastMsyncTimeMs = -1; + /** * A client using an ObserverReadProxyProvider should first sync with the * active NameNode on startup. This ensures that the client reads data which @@ -154,6 +180,12 @@ public ObserverReadProxyProvider( ObserverReadInvocationHandler.class.getClassLoader(), new Class[] {xface}, new ObserverReadInvocationHandler()); combinedProxy = new ProxyInfo<>(wrappedProxy, combinedInfo.toString()); + + autoMsyncPeriodMs = conf.getTimeDuration( + // The host of the URI is the nameservice ID + AUTO_MSYNC_PERIOD_KEY_PREFIX + "." + uri.getHost(), + AUTO_MSYNC_PERIOD_DEFAULT, TimeUnit.MILLISECONDS); + // TODO : make this configurable or remove this variable this.observerReadEnabled = true; } @@ -247,6 +279,35 @@ private synchronized void initializeMsync() throws IOException { } failoverProxy.getProxy().proxy.msync(); msynced = true; + lastMsyncTimeMs = Time.monotonicNow(); + } + + /** + * This will call {@link ClientProtocol#msync()} on the active NameNode + * (via the {@link #failoverProxy}) to update the state of this client, only + * if at least {@link #autoMsyncPeriodMs} ms has elapsed since the last time + * an msync was performed. + * + * @see #autoMsyncPeriodMs + */ + private void autoMsyncIfNecessary() throws IOException { + if (autoMsyncPeriodMs == 0) { + // Always msync + failoverProxy.getProxy().proxy.msync(); + } else if (autoMsyncPeriodMs > 0) { + if (Time.monotonicNow() - lastMsyncTimeMs > autoMsyncPeriodMs) { + synchronized (this) { + // Use a synchronized block so that only one thread will msync + // if many operations are submitted around the same time. + // Re-check the entry criterion since the status may have changed + // while waiting for the lock. + if (Time.monotonicNow() - lastMsyncTimeMs > autoMsyncPeriodMs) { + failoverProxy.getProxy().proxy.msync(); + lastMsyncTimeMs = Time.monotonicNow(); + } + } + } + } } /** @@ -273,6 +334,8 @@ public Object invoke(Object proxy, final Method method, final Object[] args) // An msync() must first be performed to ensure that this client is // up-to-date with the active's state. This will only be done once. initializeMsync(); + } else { + autoMsyncIfNecessary(); } int failedObserverCount = 0; @@ -349,6 +412,7 @@ public Object invoke(Object proxy, final Method method, final Object[] args) // If this was reached, the request reached the active, so the // state is up-to-date with active and no further msync is needed. msynced = true; + lastMsyncTimeMs = Time.monotonicNow(); lastProxy = activeProxy; return retVal; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ObserverNameNode.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ObserverNameNode.md index d93256c475..b212f00b1b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ObserverNameNode.md +++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ObserverNameNode.md @@ -61,9 +61,12 @@ ID, which is implemented using transaction ID within NameNode, is introduced in RPC headers. When a client performs write through Active NameNode, it updates its state ID using the latest transaction ID from the NameNode. When performing a subsequent read, the client passes this -state ID to Observe NameNode, which will then check against its own +state ID to Observer NameNode, which will then check against its own transaction ID, and will ensure its own transaction ID has caught up -with the request's state ID, before serving the read request. +with the request's state ID, before serving the read request. This ensures +"read your own writes" semantics from a single client. Maintaining +consistency between multiple clients in the face of out-of-band communication +is discussed in the "Maintaining Client Consistency" section below. Edit log tailing is critical for Observer NameNode as it directly affects the latency between when a transaction is applied in Active NameNode and @@ -83,6 +86,32 @@ available in the cluster, and only fall back to Active NameNode if all of the former failed. Similarly, ObserverReadProxyProviderWithIPFailover is introduced to replace IPFailoverProxyProvider in a IP failover setup. +### Maintaining Client Consistency + +As discussed above, a client 'foo' will update its state ID upon every request +to the Active NameNode, which includes all write operations. Any request +directed to an Observer NameNode will wait until the Observer has seen +this transaction ID, ensuring that the client is able to read all of its own +writes. However, if 'foo' sends an out-of-band (i.e., non-HDFS) message to +client 'bar' telling it that a write has been performed, a subsequent read by +'bar' may not see the recent write by 'foo'. To prevent this inconsistent +behavior, a new `msync()`, or "metadata sync", command has been added. When +`msync()` is called on a client, it will update its state ID against the +Active NameNode -- a very lightweight operation -- so that subsequent reads +are guaranteed to be consistent up to the point of the `msync()`. Thus as long +as 'bar' calls `msync()` before performing its read, it is guaranteed to see +the write made by 'foo'. + +To make use of `msync()`, an application does not necessarily have to make any +code changes. Upon startup, a client will automatically call `msync()` before +performing any reads against an Observer, so that any writes performed prior +to the initialization of the client will be visible. In addition, there is +a configurable "auto-msync" mode supported by ObserverReadProxyProvider which +will automatically perform an `msync()` at some configurable interval, to +prevent a client from ever seeing data that is more stale than a time bound. +There is some overhead associated with this, as each refresh requires an RPC +to the Active NameNode, so it is disabled by default. + Deployment ----------- @@ -185,3 +214,18 @@ implementation, in the client-side **hdfs-site.xml** configuration file: Clients who do not wish to use Observer NameNode can still use the existing ConfiguredFailoverProxyProvider and should not see any behavior change. + +Clients who wish to make use of the "auto-msync" functionality should adjust +the configuration below. This will specify some time period after which, +if the client's state ID has not been updated from the Active NameNode, an +`msync()` will automatically be performed. If this is specified as 0, an +`msync()` will be performed before _every_ read operation. If this is a +positive time duration, an `msync()` will be performed every time a read +operation is requested and the Active has not been contacted for longer than +that period. If this is negative (the default), no automatic `msync()` will +be performed. + + + dfs.client.failover.observer.auto-msync-period. + 500ms + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java index 2bed37ca8c..1ec47ca68a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java @@ -22,6 +22,8 @@ import static org.junit.Assert.fail; import java.io.IOException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; @@ -34,6 +36,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster; import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.ipc.RpcScheduler; import org.apache.hadoop.ipc.Schedulable; import org.apache.hadoop.test.GenericTestUtils; @@ -57,7 +60,7 @@ public class TestConsistentReadsObserver { private static Configuration conf; private static MiniQJMHACluster qjmhaCluster; private static MiniDFSCluster dfsCluster; - private static DistributedFileSystem dfs; + private DistributedFileSystem dfs; private final Path testPath= new Path("/TestConsistentReadsObserver"); @@ -74,7 +77,7 @@ public static void startUpCluster() throws Exception { @Before public void setUp() throws Exception { - setObserverRead(true); + dfs = setObserverRead(true); } @After @@ -106,8 +109,7 @@ public void testRequeueCall() throws Exception { configuration.setBoolean(prefix + CommonConfigurationKeys.IPC_BACKOFF_ENABLE, true); - dfsCluster.restartNameNode(observerIdx); - dfsCluster.transitionToObserver(observerIdx); + NameNodeAdapter.getRpcServer(nn).refreshCallQueue(configuration); dfs.create(testPath, (short)1).close(); assertSentTo(0); @@ -151,18 +153,26 @@ public void testMsyncSimple() throws Exception { assertEquals(1, readStatus.get()); } - @Test - public void testMsync() throws Exception { + private void testMsync(boolean autoMsync, long autoMsyncPeriodMs) + throws Exception { // 0 == not completed, 1 == succeeded, -1 == failed AtomicInteger readStatus = new AtomicInteger(0); Configuration conf2 = new Configuration(conf); // Disable FS cache so two different DFS clients will be used. conf2.setBoolean("fs.hdfs.impl.disable.cache", true); + if (autoMsync) { + conf2.setTimeDuration( + ObserverReadProxyProvider.AUTO_MSYNC_PERIOD_KEY_PREFIX + + "." + dfs.getUri().getHost(), + autoMsyncPeriodMs, TimeUnit.MILLISECONDS); + } DistributedFileSystem dfs2 = (DistributedFileSystem) FileSystem.get(conf2); // Initialize the proxies for Observer Node. dfs.getClient().getHAServiceState(); + // This initialization will perform the msync-on-startup, so that another + // form of msync is required later dfs2.getClient().getHAServiceState(); // Advance Observer's state ID so it is ahead of client's. @@ -176,7 +186,12 @@ public void testMsync() throws Exception { try { // After msync, client should have the latest state ID from active. // Therefore, the subsequent getFileStatus call should succeed. - dfs2.getClient().msync(); + if (!autoMsync) { + // If not testing auto-msync, perform an explicit one here + dfs2.getClient().msync(); + } else if (autoMsyncPeriodMs > 0) { + Thread.sleep(autoMsyncPeriodMs); + } dfs2.getFileStatus(testPath); if (HATestUtil.isSentToAnyOfNameNodes(dfs2, dfsCluster, 2)) { readStatus.set(1); @@ -196,10 +211,31 @@ public void testMsync() throws Exception { dfsCluster.rollEditLogAndTail(0); - GenericTestUtils.waitFor(() -> readStatus.get() != 0, 100, 10000); + GenericTestUtils.waitFor(() -> readStatus.get() != 0, 100, 3000); assertEquals(1, readStatus.get()); } + @Test + public void testExplicitMsync() throws Exception { + testMsync(false, -1); + } + + @Test + public void testAutoMsyncPeriod0() throws Exception { + testMsync(true, 0); + } + + @Test + public void testAutoMsyncPeriod5() throws Exception { + testMsync(true, 5); + } + + @Test(expected = TimeoutException.class) + public void testAutoMsyncLongPeriod() throws Exception { + // This should fail since the auto-msync is never activated + testMsync(true, Long.MAX_VALUE); + } + // A new client should first contact the active, before using an observer, // to ensure that it is up-to-date with the current state @Test @@ -313,8 +349,8 @@ private void assertSentTo(int nnIdx) throws IOException { HATestUtil.isSentToAnyOfNameNodes(dfs, dfsCluster, nnIdx)); } - private static void setObserverRead(boolean flag) throws Exception { - dfs = HATestUtil.configureObserverReadFs( + private DistributedFileSystem setObserverRead(boolean flag) throws Exception { + return HATestUtil.configureObserverReadFs( dfsCluster, conf, ObserverReadProxyProvider.class, flag); }