diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java index c598076f63..7e07d7b654 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java @@ -239,6 +239,18 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic { public static final long FEDERATION_STORE_ROUTER_EXPIRATION_DELETION_MS_DEFAULT = -1; + // HDFS Router-based federation State Store ZK DRIVER + public static final String FEDERATION_STORE_ZK_DRIVER_PREFIX = + RBFConfigKeys.FEDERATION_STORE_PREFIX + "driver.zk."; + public static final String FEDERATION_STORE_ZK_PARENT_PATH = + FEDERATION_STORE_ZK_DRIVER_PREFIX + "parent-path"; + public static final String FEDERATION_STORE_ZK_PARENT_PATH_DEFAULT = + "/hdfs-federation"; + public static final String FEDERATION_STORE_ZK_ASYNC_MAX_THREADS = + FEDERATION_STORE_ZK_DRIVER_PREFIX + "async.max.threads"; + public static final int FEDERATION_STORE_ZK_ASYNC_MAX_THREADS_DEFAULT = + -1; + // HDFS Router safe mode public static final String DFS_ROUTER_SAFEMODE_ENABLE = FEDERATION_ROUTER_PREFIX + "safemode.enable"; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java index 45442da0ab..7882c8f827 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java @@ -25,7 +25,16 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.imps.CuratorFrameworkState; import org.apache.hadoop.conf.Configuration; @@ -57,14 +66,9 @@ public class StateStoreZooKeeperImpl extends StateStoreSerializableImpl { private static final Logger LOG = LoggerFactory.getLogger(StateStoreZooKeeperImpl.class); - - /** Configuration keys. */ - public static final String FEDERATION_STORE_ZK_DRIVER_PREFIX = - RBFConfigKeys.FEDERATION_STORE_PREFIX + "driver.zk."; - public static final String FEDERATION_STORE_ZK_PARENT_PATH = - FEDERATION_STORE_ZK_DRIVER_PREFIX + "parent-path"; - public static final String FEDERATION_STORE_ZK_PARENT_PATH_DEFAULT = - "/hdfs-federation"; + /** Service to get/update zk state. */ + private ThreadPoolExecutor executorService; + private boolean enableConcurrent; /** Directory to store the state store data. */ @@ -82,8 +86,22 @@ public boolean initDriver() { Configuration conf = getConf(); baseZNode = conf.get( - FEDERATION_STORE_ZK_PARENT_PATH, - FEDERATION_STORE_ZK_PARENT_PATH_DEFAULT); + RBFConfigKeys.FEDERATION_STORE_ZK_PARENT_PATH, + RBFConfigKeys.FEDERATION_STORE_ZK_PARENT_PATH_DEFAULT); + int numThreads = conf.getInt( + RBFConfigKeys.FEDERATION_STORE_ZK_ASYNC_MAX_THREADS, + RBFConfigKeys.FEDERATION_STORE_ZK_ASYNC_MAX_THREADS_DEFAULT); + enableConcurrent = numThreads > 0; + if (enableConcurrent) { + ThreadFactory threadFactory = new ThreadFactoryBuilder() + .setNameFormat("StateStore ZK Client-%d") + .build(); + this.executorService = new ThreadPoolExecutor(numThreads, numThreads, + 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), threadFactory); + LOG.info("Init StateStoreZookeeperImpl by async mode with {} threads.", numThreads); + } else { + LOG.info("Init StateStoreZookeeperImpl by sync mode."); + } try { this.zkManager = new ZKCuratorManager(conf); this.zkManager.start(); @@ -109,8 +127,16 @@ public boolean initRecordStorage( } } + @VisibleForTesting + public void setEnableConcurrent(boolean enableConcurrent) { + this.enableConcurrent = enableConcurrent; + } + @Override public void close() throws Exception { + if (executorService != null) { + executorService.shutdown(); + } if (zkManager != null) { zkManager.close(); } @@ -136,34 +162,21 @@ public QueryResult get(Class clazz) List ret = new ArrayList<>(); String znode = getZNodeForClass(clazz); try { - List children = zkManager.getChildren(znode); - for (String child : children) { - try { - String path = getNodePath(znode, child); - Stat stat = new Stat(); - String data = zkManager.getStringData(path, stat); - boolean corrupted = false; - if (data == null || data.equals("")) { - // All records should have data, otherwise this is corrupted - corrupted = true; - } else { - try { - T record = createRecord(data, stat, clazz); - ret.add(record); - } catch (IOException e) { - LOG.error("Cannot create record type \"{}\" from \"{}\": {}", - clazz.getSimpleName(), data, e.getMessage()); - corrupted = true; - } + List> callables = new ArrayList<>(); + zkManager.getChildren(znode).forEach(c -> callables.add(() -> getRecord(clazz, znode, c))); + if (enableConcurrent) { + List> futures = executorService.invokeAll(callables); + for (Future future : futures) { + if (future.get() != null) { + ret.add(future.get()); } - - if (corrupted) { - LOG.error("Cannot get data for {} at {}, cleaning corrupted data", - child, path); - zkManager.delete(path); + } + } else { + for (Callable callable : callables) { + T record = callable.call(); + if (record != null) { + ret.add(record); } - } catch (Exception e) { - LOG.error("Cannot get data for {}: {}", child, e.getMessage()); } } } catch (Exception e) { @@ -178,6 +191,44 @@ public QueryResult get(Class clazz) return new QueryResult(ret, getTime()); } + /** + * Get one data record in the StateStore or delete it if it's corrupted. + * + * @param clazz Record class to evaluate. + * @param znode The ZNode for the class. + * @param child The child for znode to get. + * @return The record to get. + */ + private T getRecord(Class clazz, String znode, String child) { + T record = null; + try { + String path = getNodePath(znode, child); + Stat stat = new Stat(); + String data = zkManager.getStringData(path, stat); + boolean corrupted = false; + if (data == null || data.equals("")) { + // All records should have data, otherwise this is corrupted + corrupted = true; + } else { + try { + record = createRecord(data, stat, clazz); + } catch (IOException e) { + LOG.error("Cannot create record type \"{}\" from \"{}\": {}", + clazz.getSimpleName(), data, e.getMessage()); + corrupted = true; + } + } + + if (corrupted) { + LOG.error("Cannot get data for {} at {}, cleaning corrupted data", child, path); + zkManager.delete(path); + } + } catch (Exception e) { + LOG.error("Cannot get data for {}: {}", child, e.getMessage()); + } + return record; + } + @Override public boolean putAll( List records, boolean update, boolean error) throws IOException { @@ -192,22 +243,40 @@ public boolean putAll( String znode = getZNodeForClass(recordClass); long start = monotonicNow(); - boolean status = true; - for (T record : records) { - String primaryKey = getPrimaryKey(record); - String recordZNode = getNodePath(znode, primaryKey); - byte[] data = serialize(record); - if (!writeNode(recordZNode, data, update, error)){ - status = false; + final AtomicBoolean status = new AtomicBoolean(true); + List> callables = new ArrayList<>(); + records.forEach(record -> + callables.add( + () -> { + String primaryKey = getPrimaryKey(record); + String recordZNode = getNodePath(znode, primaryKey); + byte[] data = serialize(record); + if (!writeNode(recordZNode, data, update, error)) { + status.set(false); + } + return null; + } + ) + ); + try { + if (enableConcurrent) { + executorService.invokeAll(callables); + } else { + for(Callable callable : callables) { + callable.call(); + } } + } catch (Exception e) { + LOG.error("Write record failed : {}", e.getMessage(), e); + throw new IOException(e); } long end = monotonicNow(); - if (status) { + if (status.get()) { getMetrics().addWrite(end - start); } else { getMetrics().addFailure(end - start); } - return status; + return status.get(); } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml index 52a1e3a3bd..b5096cd253 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml @@ -377,6 +377,26 @@ + + dfs.federation.router.store.driver.zk.parent-path + /hdfs-federation + + The parent path of zookeeper for StateStoreZooKeeperImpl. + + + + + dfs.federation.router.store.driver.zk.async.max.threads + -1 + + Max threads number of StateStoreZooKeeperImpl in async mode. + The only class currently being supported: + org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreZooKeeperImpl. + Default value is -1, which means StateStoreZooKeeperImpl is working in sync mode. + Use positive integer value to enable async mode. + + + dfs.federation.router.cache.ttl 1m diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java index 5ad01dce8e..4eb38b06b1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java @@ -119,7 +119,7 @@ private T generateRandomEnum(Class enumClass) { } @SuppressWarnings("unchecked") - private T generateFakeRecord(Class recordClass) + protected T generateFakeRecord(Class recordClass) throws IllegalArgumentException, IllegalAccessException, IOException { if (recordClass == MembershipState.class) { diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreZK.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreZK.java index f8be9f0a05..3ad106697a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreZK.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreZK.java @@ -18,12 +18,13 @@ package org.apache.hadoop.hdfs.server.federation.store.driver; import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.getStateStoreConfiguration; -import static org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreZooKeeperImpl.FEDERATION_STORE_ZK_PARENT_PATH; -import static org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreZooKeeperImpl.FEDERATION_STORE_ZK_PARENT_PATH_DEFAULT; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.TimeUnit; import org.apache.curator.framework.CuratorFramework; @@ -40,6 +41,7 @@ import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState; import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; import org.apache.hadoop.hdfs.server.federation.store.records.RouterState; +import org.apache.hadoop.util.Time; import org.apache.zookeeper.CreateMode; import org.junit.AfterClass; import org.junit.Before; @@ -73,9 +75,10 @@ public static void setupCluster() throws Exception { // Disable auto-repair of connection conf.setLong(RBFConfigKeys.FEDERATION_STORE_CONNECTION_TEST_MS, TimeUnit.HOURS.toMillis(1)); + conf.setInt(RBFConfigKeys.FEDERATION_STORE_ZK_ASYNC_MAX_THREADS, 10); - baseZNode = conf.get(FEDERATION_STORE_ZK_PARENT_PATH, - FEDERATION_STORE_ZK_PARENT_PATH_DEFAULT); + baseZNode = conf.get(RBFConfigKeys.FEDERATION_STORE_ZK_PARENT_PATH, + RBFConfigKeys.FEDERATION_STORE_ZK_PARENT_PATH_DEFAULT); getStateStore(conf); } @@ -91,6 +94,8 @@ public static void tearDownCluster() { @Before public void startup() throws IOException { removeAll(getStateStoreDriver()); + StateStoreZooKeeperImpl stateStoreZooKeeper = (StateStoreZooKeeperImpl) getStateStoreDriver(); + stateStoreZooKeeper.setEnableConcurrent(false); } private String generateFakeZNode( @@ -126,33 +131,79 @@ private void testGetNullRecord( assertNull(curatorFramework.checkExists().forPath(znode)); } + @Test + public void testAsyncPerformance() throws Exception { + StateStoreZooKeeperImpl stateStoreDriver = (StateStoreZooKeeperImpl) getStateStoreDriver(); + List insertList = new ArrayList<>(); + for (int i = 0; i < 1000; i++) { + MountTable newRecord = generateFakeRecord(MountTable.class); + insertList.add(newRecord); + } + // Insert Multiple on sync mode + long startSync = Time.now(); + stateStoreDriver.putAll(insertList, true, false); + long endSync = Time.now(); + stateStoreDriver.removeAll(MembershipState.class); + + stateStoreDriver.setEnableConcurrent(true); + // Insert Multiple on async mode + long startAsync = Time.now(); + stateStoreDriver.putAll(insertList, true, false); + long endAsync = Time.now(); + assertTrue((endSync - startSync) > (endAsync - startAsync)); + } + @Test public void testGetNullRecord() throws Exception { - testGetNullRecord(getStateStoreDriver()); + StateStoreZooKeeperImpl stateStoreDriver = (StateStoreZooKeeperImpl) getStateStoreDriver(); + testGetNullRecord(stateStoreDriver); + + // test async mode + stateStoreDriver.setEnableConcurrent(true); + testGetNullRecord(stateStoreDriver); } @Test public void testInsert() throws IllegalArgumentException, IllegalAccessException, IOException { - testInsert(getStateStoreDriver()); + StateStoreZooKeeperImpl stateStoreDriver = (StateStoreZooKeeperImpl) getStateStoreDriver(); + testInsert(stateStoreDriver); + // test async mode + stateStoreDriver.setEnableConcurrent(true); + testInsert(stateStoreDriver); } @Test public void testUpdate() throws IllegalArgumentException, ReflectiveOperationException, IOException, SecurityException { - testPut(getStateStoreDriver()); + StateStoreZooKeeperImpl stateStoreDriver = (StateStoreZooKeeperImpl) getStateStoreDriver(); + testPut(stateStoreDriver); + + // test async mode + stateStoreDriver.setEnableConcurrent(true); + testPut(stateStoreDriver); } @Test public void testDelete() throws IllegalArgumentException, IllegalAccessException, IOException { - testRemove(getStateStoreDriver()); + StateStoreZooKeeperImpl stateStoreDriver = (StateStoreZooKeeperImpl) getStateStoreDriver(); + testRemove(stateStoreDriver); + + // test async mode + stateStoreDriver.setEnableConcurrent(true); + testRemove(stateStoreDriver); } @Test public void testFetchErrors() throws IllegalArgumentException, IllegalAccessException, IOException { - testFetchErrors(getStateStoreDriver()); + StateStoreZooKeeperImpl stateStoreDriver = (StateStoreZooKeeperImpl) getStateStoreDriver(); + testFetchErrors(stateStoreDriver); + + // test async mode + stateStoreDriver.setEnableConcurrent(true); + testFetchErrors(stateStoreDriver); } } \ No newline at end of file