HDFS-14211. [SBN Read]. Add a configurable flag to enable always-msync mode to ObserverReadProxyProvider. Contributed by Erik Krogen.

This commit is contained in:
Erik Krogen 2019-03-08 10:35:31 -08:00
parent c9e50c48de
commit 55b3a718e9
3 changed files with 156 additions and 12 deletions

View File

@ -23,6 +23,7 @@
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.URI;
import java.util.concurrent.TimeUnit;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
@ -43,6 +44,7 @@
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.RpcInvocationHandler;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -68,6 +70,12 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
private static final Logger LOG = LoggerFactory.getLogger(
ObserverReadProxyProvider.class);
/** Configuration key for {@link #autoMsyncPeriodMs}. */
static final String AUTO_MSYNC_PERIOD_KEY_PREFIX =
HdfsClientConfigKeys.Failover.PREFIX + "observer.auto-msync-period";
/** Auto-msync disabled by default. */
static final long AUTO_MSYNC_PERIOD_DEFAULT = -1;
/** Client-side context for syncing with the NameNode server side. */
private final AlignmentContext alignmentContext;
@ -87,6 +95,24 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
*/
private boolean observerReadEnabled;
/**
* This adjusts how frequently this proxy provider should auto-msync to the
* Active NameNode, automatically performing an msync() call to the active
* to fetch the current transaction ID before submitting read requests to
* observer nodes. See HDFS-14211 for more description of this feature.
* If this is below 0, never auto-msync. If this is 0, perform an msync on
* every read operation. If this is above 0, perform an msync after this many
* ms have elapsed since the last msync.
*/
private final long autoMsyncPeriodMs;
/**
* The time, in millisecond epoch, that the last msync operation was
* performed. This includes any implicit msync (any operation which is
* serviced by the Active NameNode).
*/
private volatile long lastMsyncTimeMs = -1;
/**
* A client using an ObserverReadProxyProvider should first sync with the
* active NameNode on startup. This ensures that the client reads data which
@ -154,6 +180,12 @@ public ObserverReadProxyProvider(
ObserverReadInvocationHandler.class.getClassLoader(),
new Class<?>[] {xface}, new ObserverReadInvocationHandler());
combinedProxy = new ProxyInfo<>(wrappedProxy, combinedInfo.toString());
autoMsyncPeriodMs = conf.getTimeDuration(
// The host of the URI is the nameservice ID
AUTO_MSYNC_PERIOD_KEY_PREFIX + "." + uri.getHost(),
AUTO_MSYNC_PERIOD_DEFAULT, TimeUnit.MILLISECONDS);
// TODO : make this configurable or remove this variable
this.observerReadEnabled = true;
}
@ -247,6 +279,35 @@ private synchronized void initializeMsync() throws IOException {
}
failoverProxy.getProxy().proxy.msync();
msynced = true;
lastMsyncTimeMs = Time.monotonicNow();
}
/**
* This will call {@link ClientProtocol#msync()} on the active NameNode
* (via the {@link #failoverProxy}) to update the state of this client, only
* if at least {@link #autoMsyncPeriodMs} ms has elapsed since the last time
* an msync was performed.
*
* @see #autoMsyncPeriodMs
*/
private void autoMsyncIfNecessary() throws IOException {
if (autoMsyncPeriodMs == 0) {
// Always msync
failoverProxy.getProxy().proxy.msync();
} else if (autoMsyncPeriodMs > 0) {
if (Time.monotonicNow() - lastMsyncTimeMs > autoMsyncPeriodMs) {
synchronized (this) {
// Use a synchronized block so that only one thread will msync
// if many operations are submitted around the same time.
// Re-check the entry criterion since the status may have changed
// while waiting for the lock.
if (Time.monotonicNow() - lastMsyncTimeMs > autoMsyncPeriodMs) {
failoverProxy.getProxy().proxy.msync();
lastMsyncTimeMs = Time.monotonicNow();
}
}
}
}
}
/**
@ -273,6 +334,8 @@ public Object invoke(Object proxy, final Method method, final Object[] args)
// An msync() must first be performed to ensure that this client is
// up-to-date with the active's state. This will only be done once.
initializeMsync();
} else {
autoMsyncIfNecessary();
}
int failedObserverCount = 0;
@ -349,6 +412,7 @@ public Object invoke(Object proxy, final Method method, final Object[] args)
// If this was reached, the request reached the active, so the
// state is up-to-date with active and no further msync is needed.
msynced = true;
lastMsyncTimeMs = Time.monotonicNow();
lastProxy = activeProxy;
return retVal;
}

View File

@ -61,9 +61,12 @@ ID, which is implemented using transaction ID within NameNode, is
introduced in RPC headers. When a client performs write through Active
NameNode, it updates its state ID using the latest transaction ID from
the NameNode. When performing a subsequent read, the client passes this
state ID to Observe NameNode, which will then check against its own
state ID to Observer NameNode, which will then check against its own
transaction ID, and will ensure its own transaction ID has caught up
with the request's state ID, before serving the read request.
with the request's state ID, before serving the read request. This ensures
"read your own writes" semantics from a single client. Maintaining
consistency between multiple clients in the face of out-of-band communication
is discussed in the "Maintaining Client Consistency" section below.
Edit log tailing is critical for Observer NameNode as it directly affects
the latency between when a transaction is applied in Active NameNode and
@ -83,6 +86,32 @@ available in the cluster, and only fall back to Active NameNode if all
of the former failed. Similarly, ObserverReadProxyProviderWithIPFailover
is introduced to replace IPFailoverProxyProvider in a IP failover setup.
### Maintaining Client Consistency
As discussed above, a client 'foo' will update its state ID upon every request
to the Active NameNode, which includes all write operations. Any request
directed to an Observer NameNode will wait until the Observer has seen
this transaction ID, ensuring that the client is able to read all of its own
writes. However, if 'foo' sends an out-of-band (i.e., non-HDFS) message to
client 'bar' telling it that a write has been performed, a subsequent read by
'bar' may not see the recent write by 'foo'. To prevent this inconsistent
behavior, a new `msync()`, or "metadata sync", command has been added. When
`msync()` is called on a client, it will update its state ID against the
Active NameNode -- a very lightweight operation -- so that subsequent reads
are guaranteed to be consistent up to the point of the `msync()`. Thus as long
as 'bar' calls `msync()` before performing its read, it is guaranteed to see
the write made by 'foo'.
To make use of `msync()`, an application does not necessarily have to make any
code changes. Upon startup, a client will automatically call `msync()` before
performing any reads against an Observer, so that any writes performed prior
to the initialization of the client will be visible. In addition, there is
a configurable "auto-msync" mode supported by ObserverReadProxyProvider which
will automatically perform an `msync()` at some configurable interval, to
prevent a client from ever seeing data that is more stale than a time bound.
There is some overhead associated with this, as each refresh requires an RPC
to the Active NameNode, so it is disabled by default.
Deployment
-----------
@ -185,3 +214,18 @@ implementation, in the client-side **hdfs-site.xml** configuration file:
Clients who do not wish to use Observer NameNode can still use the
existing ConfiguredFailoverProxyProvider and should not see any behavior
change.
Clients who wish to make use of the "auto-msync" functionality should adjust
the configuration below. This will specify some time period after which,
if the client's state ID has not been updated from the Active NameNode, an
`msync()` will automatically be performed. If this is specified as 0, an
`msync()` will be performed before _every_ read operation. If this is a
positive time duration, an `msync()` will be performed every time a read
operation is requested and the Active has not been contacted for longer than
that period. If this is negative (the default), no automatic `msync()` will
be performed.
<property>
<name>dfs.client.failover.observer.auto-msync-period.<nameservice></name>
<value>500ms</value>
</property>

View File

@ -22,6 +22,8 @@
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
@ -34,6 +36,7 @@
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.ipc.RpcScheduler;
import org.apache.hadoop.ipc.Schedulable;
import org.apache.hadoop.test.GenericTestUtils;
@ -57,7 +60,7 @@ public class TestConsistentReadsObserver {
private static Configuration conf;
private static MiniQJMHACluster qjmhaCluster;
private static MiniDFSCluster dfsCluster;
private static DistributedFileSystem dfs;
private DistributedFileSystem dfs;
private final Path testPath= new Path("/TestConsistentReadsObserver");
@ -74,7 +77,7 @@ public static void startUpCluster() throws Exception {
@Before
public void setUp() throws Exception {
setObserverRead(true);
dfs = setObserverRead(true);
}
@After
@ -106,8 +109,7 @@ public void testRequeueCall() throws Exception {
configuration.setBoolean(prefix
+ CommonConfigurationKeys.IPC_BACKOFF_ENABLE, true);
dfsCluster.restartNameNode(observerIdx);
dfsCluster.transitionToObserver(observerIdx);
NameNodeAdapter.getRpcServer(nn).refreshCallQueue(configuration);
dfs.create(testPath, (short)1).close();
assertSentTo(0);
@ -151,18 +153,26 @@ public void testMsyncSimple() throws Exception {
assertEquals(1, readStatus.get());
}
@Test
public void testMsync() throws Exception {
private void testMsync(boolean autoMsync, long autoMsyncPeriodMs)
throws Exception {
// 0 == not completed, 1 == succeeded, -1 == failed
AtomicInteger readStatus = new AtomicInteger(0);
Configuration conf2 = new Configuration(conf);
// Disable FS cache so two different DFS clients will be used.
conf2.setBoolean("fs.hdfs.impl.disable.cache", true);
if (autoMsync) {
conf2.setTimeDuration(
ObserverReadProxyProvider.AUTO_MSYNC_PERIOD_KEY_PREFIX
+ "." + dfs.getUri().getHost(),
autoMsyncPeriodMs, TimeUnit.MILLISECONDS);
}
DistributedFileSystem dfs2 = (DistributedFileSystem) FileSystem.get(conf2);
// Initialize the proxies for Observer Node.
dfs.getClient().getHAServiceState();
// This initialization will perform the msync-on-startup, so that another
// form of msync is required later
dfs2.getClient().getHAServiceState();
// Advance Observer's state ID so it is ahead of client's.
@ -176,7 +186,12 @@ public void testMsync() throws Exception {
try {
// After msync, client should have the latest state ID from active.
// Therefore, the subsequent getFileStatus call should succeed.
dfs2.getClient().msync();
if (!autoMsync) {
// If not testing auto-msync, perform an explicit one here
dfs2.getClient().msync();
} else if (autoMsyncPeriodMs > 0) {
Thread.sleep(autoMsyncPeriodMs);
}
dfs2.getFileStatus(testPath);
if (HATestUtil.isSentToAnyOfNameNodes(dfs2, dfsCluster, 2)) {
readStatus.set(1);
@ -196,10 +211,31 @@ public void testMsync() throws Exception {
dfsCluster.rollEditLogAndTail(0);
GenericTestUtils.waitFor(() -> readStatus.get() != 0, 100, 10000);
GenericTestUtils.waitFor(() -> readStatus.get() != 0, 100, 3000);
assertEquals(1, readStatus.get());
}
@Test
public void testExplicitMsync() throws Exception {
testMsync(false, -1);
}
@Test
public void testAutoMsyncPeriod0() throws Exception {
testMsync(true, 0);
}
@Test
public void testAutoMsyncPeriod5() throws Exception {
testMsync(true, 5);
}
@Test(expected = TimeoutException.class)
public void testAutoMsyncLongPeriod() throws Exception {
// This should fail since the auto-msync is never activated
testMsync(true, Long.MAX_VALUE);
}
// A new client should first contact the active, before using an observer,
// to ensure that it is up-to-date with the current state
@Test
@ -313,8 +349,8 @@ private void assertSentTo(int nnIdx) throws IOException {
HATestUtil.isSentToAnyOfNameNodes(dfs, dfsCluster, nnIdx));
}
private static void setObserverRead(boolean flag) throws Exception {
dfs = HATestUtil.configureObserverReadFs(
private DistributedFileSystem setObserverRead(boolean flag) throws Exception {
return HATestUtil.configureObserverReadFs(
dfsCluster, conf, ObserverReadProxyProvider.class, flag);
}