From 937caf7de9e50268ff49af86825eac698fb98d2d Mon Sep 17 00:00:00 2001 From: Viraj Jasani Date: Tue, 4 Apr 2023 06:39:53 -0700 Subject: [PATCH] HDFS-16967. RBF: File based state stores should allow concurrent access to the records (#5523) Reviewed-by: Inigo Goiri Reviewed-by: Simbarashe Dzinamarira Signed-off-by: Takanobu Asanuma --- .../federation/router/RBFConfigKeys.java | 9 + .../driver/impl/StateStoreFileBaseImpl.java | 197 ++++++++++++++---- .../store/driver/impl/StateStoreFileImpl.java | 7 + .../driver/impl/StateStoreFileSystemImpl.java | 9 +- .../src/main/resources/hdfs-rbf-default.xml | 28 +++ .../driver/TestStateStoreDriverBase.java | 1 + .../store/driver/TestStateStoreFile.java | 32 ++- .../driver/TestStateStoreFileSystem.java | 47 +++-- 8 files changed, 268 insertions(+), 62 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java index c0ee950459..f47d6ceb26 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java @@ -255,6 +255,15 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic { public static final int FEDERATION_STORE_ZK_ASYNC_MAX_THREADS_DEFAULT = -1; + // HDFS Router-based federation File based store implementation specific configs + public static final String FEDERATION_STORE_FILE_ASYNC_THREADS = + FEDERATION_STORE_PREFIX + "driver.file.async.threads"; + public static final int FEDERATION_STORE_FILE_ASYNC_THREADS_DEFAULT = 0; + + public static final String FEDERATION_STORE_FS_ASYNC_THREADS = + FEDERATION_STORE_PREFIX + "driver.fs.async.threads"; + public static final int FEDERATION_STORE_FS_ASYNC_THREADS_DEFAULT = 0; + // HDFS Router safe mode public static final String DFS_ROUTER_SAFEMODE_ENABLE = FEDERATION_ROUTER_PREFIX + "safemode.enable"; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java index c93d919aea..ec3c89b65b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java @@ -25,14 +25,24 @@ import java.io.BufferedWriter; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Matcher; import java.util.regex.Pattern; +import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; + import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMetrics; import org.apache.hadoop.hdfs.server.federation.store.StateStoreUnavailableException; import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils; @@ -69,6 +79,8 @@ public abstract class StateStoreFileBaseImpl /** If it is initialized. */ private boolean initialized = false; + private ExecutorService concurrentStoreAccessPool; + /** * Get the reader of a record for the file system. @@ -137,6 +149,8 @@ public abstract BufferedWriter getWriter( */ protected abstract String getRootDir(); + protected abstract int getConcurrentFilesAccessNumThreads(); + /** * Set the driver as initialized. * @@ -168,9 +182,31 @@ public boolean initDriver() { return false; } setInitialized(true); + int threads = getConcurrentFilesAccessNumThreads(); + if (threads > 1) { + this.concurrentStoreAccessPool = + new ThreadPoolExecutor(threads, threads, 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<>(), + new ThreadFactoryBuilder() + .setNameFormat("state-store-file-based-concurrent-%d") + .setDaemon(true).build()); + LOG.info("File based state store will be accessed concurrently with {} max threads", threads); + } else { + LOG.info("File based state store will be accessed serially"); + } return true; } + @Override + public void close() throws Exception { + if (this.concurrentStoreAccessPool != null) { + this.concurrentStoreAccessPool.shutdown(); + boolean isTerminated = this.concurrentStoreAccessPool.awaitTermination(5, TimeUnit.SECONDS); + LOG.info("Concurrent store access pool is terminated: {}", isTerminated); + this.concurrentStoreAccessPool = null; + } + } + @Override public boolean initRecordStorage( String className, Class recordClass) { @@ -198,22 +234,29 @@ public QueryResult get(Class clazz) verifyDriverReady(); long start = monotonicNow(); StateStoreMetrics metrics = getMetrics(); - List ret = new ArrayList<>(); + List result = Collections.synchronizedList(new ArrayList<>()); try { String path = getPathForClass(clazz); List children = getChildren(path); - for (String child : children) { - String pathRecord = path + "/" + child; - if (child.endsWith(TMP_MARK)) { - LOG.debug("There is a temporary file {} in {}", child, path); - if (isOldTempRecord(child)) { - LOG.warn("Removing {} as it's an old temporary record", child); - remove(pathRecord); - } - } else { - T record = getRecord(pathRecord, clazz); - ret.add(record); + List> callables = new ArrayList<>(); + children.forEach(child -> callables.add( + () -> getRecordsFromFileAndRemoveOldTmpRecords(clazz, result, path, child))); + if (this.concurrentStoreAccessPool != null) { + // Read records concurrently + List> futures = this.concurrentStoreAccessPool.invokeAll(callables); + for (Future future : futures) { + future.get(); } + } else { + // Read records serially + callables.forEach(e -> { + try { + e.call(); + } catch (Exception ex) { + LOG.error("Failed to retrieve record using file operations.", ex); + throw new RuntimeException(ex); + } + }); } } catch (Exception e) { if (metrics != null) { @@ -227,7 +270,37 @@ public QueryResult get(Class clazz) if (metrics != null) { metrics.addRead(monotonicNow() - start); } - return new QueryResult(ret, getTime()); + return new QueryResult<>(result, getTime()); + } + + /** + * Get the state store record from the given path (path/child) and add the record to the + * result list. + * + * @param clazz Class of the record. + * @param result The list of results record. The records would be added to it unless the given + * path represents old temp file. + * @param path The parent path. + * @param child The child path under the parent path. Both path and child completes the file + * location for the given record. + * @param Record class of the records. + * @return Void. + * @throws IOException If the file read operation fails. + */ + private Void getRecordsFromFileAndRemoveOldTmpRecords(Class clazz, + List result, String path, String child) throws IOException { + String pathRecord = path + "/" + child; + if (child.endsWith(TMP_MARK)) { + LOG.debug("There is a temporary file {} in {}", child, path); + if (isOldTempRecord(child)) { + LOG.warn("Removing {} as it's an old temporary record", child); + remove(pathRecord); + } + } else { + T record = getRecord(pathRecord, clazz); + result.add(record); + } + return null; } /** @@ -260,23 +333,17 @@ public static boolean isOldTempRecord(final String pathRecord) { */ private T getRecord( final String path, final Class clazz) throws IOException { - BufferedReader reader = getReader(path); - try { + try (BufferedReader reader = getReader(path)) { String line; while ((line = reader.readLine()) != null) { if (!line.startsWith("#") && line.length() > 0) { try { - T record = newRecord(line, clazz, false); - return record; + return newRecord(line, clazz, false); } catch (Exception ex) { LOG.error("Cannot parse line {} in file {}", line, path, ex); } } } - } finally { - if (reader != null) { - reader.close(); - } } throw new IOException("Cannot read " + path + " for record " + clazz.getSimpleName()); @@ -330,13 +397,12 @@ public boolean putAll( record.setDateModified(this.getTime()); toWrite.put(recordPath, record); } else if (errorIfExists) { - LOG.error("Attempt to insert record {} that already exists", - recordPath); + LOG.error("Attempt to insert record {} that already exists", recordPath); if (metrics != null) { metrics.addFailure(monotonicNow() - start); } return false; - } else { + } else { LOG.debug("Not updating {}", record); } } else { @@ -345,36 +411,81 @@ public boolean putAll( } // Write the records - boolean success = true; - for (Entry entry : toWrite.entrySet()) { - String recordPath = entry.getKey(); - String recordPathTemp = recordPath + "." + now() + TMP_MARK; - boolean recordWrittenSuccessfully = true; - try (BufferedWriter writer = getWriter(recordPathTemp)) { - T record = entry.getValue(); - String line = serializeString(record); - writer.write(line); - } catch (IOException e) { - LOG.error("Cannot write {}", recordPathTemp, e); - recordWrittenSuccessfully = false; - success = false; + final AtomicBoolean success = new AtomicBoolean(true); + final List> callables = new ArrayList<>(); + toWrite.entrySet().forEach(entry -> callables.add(() -> writeRecordToFile(success, entry))); + if (this.concurrentStoreAccessPool != null) { + // Write records concurrently + List> futures = null; + try { + futures = this.concurrentStoreAccessPool.invokeAll(callables); + } catch (InterruptedException e) { + success.set(false); + LOG.error("Failed to put record concurrently.", e); } - // Commit - if (recordWrittenSuccessfully && !rename(recordPathTemp, recordPath)) { - LOG.error("Failed committing record into {}", recordPath); - success = false; + if (futures != null) { + for (Future future : futures) { + try { + future.get(); + } catch (InterruptedException | ExecutionException e) { + success.set(false); + LOG.error("Failed to retrieve results from concurrent record put runs.", e); + } + } } + } else { + // Write records serially + callables.forEach(callable -> { + try { + callable.call(); + } catch (Exception e) { + success.set(false); + LOG.error("Failed to put record.", e); + } + }); } long end = monotonicNow(); if (metrics != null) { - if (success) { + if (success.get()) { metrics.addWrite(end - start); } else { metrics.addFailure(end - start); } } - return success; + return success.get(); + } + + /** + * Writes the state store record to the file. At first, the record is written to a temp location + * and then later renamed to the final location that is passed with the entry key. + * + * @param success The atomic boolean that gets updated to false if the file write operation fails. + * @param entry The entry of the record path and the state store record to be written to the file + * by first writing to a temp location and then renaming it to the record path. + * @param Record class of the records. + * @return Void. + */ + private Void writeRecordToFile(AtomicBoolean success, + Entry entry) { + String recordPath = entry.getKey(); + String recordPathTemp = recordPath + "." + now() + TMP_MARK; + boolean recordWrittenSuccessfully = true; + try (BufferedWriter writer = getWriter(recordPathTemp)) { + T record = entry.getValue(); + String line = serializeString(record); + writer.write(line); + } catch (IOException e) { + LOG.error("Cannot write {}", recordPathTemp, e); + recordWrittenSuccessfully = false; + success.set(false); + } + // Commit + if (recordWrittenSuccessfully && !rename(recordPathTemp, recordPath)) { + LOG.error("Failed committing record into {}", recordPath); + success.set(false); + } + return null; } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileImpl.java index 6ca2663716..1df26e0784 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileImpl.java @@ -109,6 +109,12 @@ protected String getRootDir() { return this.rootDirectory; } + @Override + protected int getConcurrentFilesAccessNumThreads() { + return getConf().getInt(RBFConfigKeys.FEDERATION_STORE_FILE_ASYNC_THREADS, + RBFConfigKeys.FEDERATION_STORE_FILE_ASYNC_THREADS_DEFAULT); + } + @Override protected BufferedReader getReader(String filename) { BufferedReader reader = null; @@ -144,6 +150,7 @@ public BufferedWriter getWriter(String filename) { @Override public void close() throws Exception { + super.close(); setInitialized(false); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileSystemImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileSystemImpl.java index ee34d8a4ca..d05682398e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileSystemImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileSystemImpl.java @@ -45,7 +45,7 @@ /** * {@link StateStoreDriver} implementation based on a filesystem. The common * implementation uses HDFS as a backend. The path can be specified setting - * dfs.federation.router.driver.fs.path=hdfs://host:port/path/to/store. + * dfs.federation.router.store.driver.fs.path=hdfs://host:port/path/to/store. */ public class StateStoreFileSystemImpl extends StateStoreFileBaseImpl { @@ -117,8 +117,15 @@ protected String getRootDir() { return this.workPath; } + @Override + protected int getConcurrentFilesAccessNumThreads() { + return getConf().getInt(RBFConfigKeys.FEDERATION_STORE_FS_ASYNC_THREADS, + RBFConfigKeys.FEDERATION_STORE_FS_ASYNC_THREADS_DEFAULT); + } + @Override public void close() throws Exception { + super.close(); if (fs != null) { fs.close(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml index 79a16cc202..780fb76a2d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml @@ -894,4 +894,32 @@ If this is below 0, the auto-refresh is disabled. + + + dfs.federation.router.store.driver.file.async.threads + 0 + + Max threads used by StateStoreFileImpl to access state store files concurrently. + The only class currently being supported: + org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileImpl. + Default value is 0, which means StateStoreFileImpl would work in sync mode, meaning it + would access one file at a time. + Use positive integer value to enable concurrent files access. + + + + + dfs.federation.router.store.driver.fs.async.threads + 0 + + Max threads used by StateStoreFileSystemImpl to access state store files from the given + filesystem concurrently. + The only class currently being supported: + org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileSystemImpl. + Default value is 0, which means StateStoreFileSystemImpl would work in sync mode, meaning it + would access one file from the filesystem at a time. + Use positive integer value to enable concurrent files access from the given filesystem. + + + diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java index 48d84f9326..73d0774ace 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java @@ -94,6 +94,7 @@ public void cleanMetrics() { public static void tearDownCluster() { if (stateStore != null) { stateStore.stop(); + stateStore = null; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFile.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFile.java index b01500b2ea..5b5b3fc1f8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFile.java @@ -18,31 +18,55 @@ package org.apache.hadoop.hdfs.server.federation.store.driver; import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.getStateStoreConfiguration; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.FEDERATION_STORE_FILE_ASYNC_THREADS; import java.io.IOException; +import java.util.Arrays; +import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileImpl; + +import org.junit.After; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; /** * Test the FileSystem (e.g., HDFS) implementation of the State Store driver. */ +@RunWith(Parameterized.class) public class TestStateStoreFile extends TestStateStoreDriverBase { - @BeforeClass - public static void setupCluster() throws Exception { + private final String numFileAsyncThreads; + + public TestStateStoreFile(String numFileAsyncThreads) { + this.numFileAsyncThreads = numFileAsyncThreads; + } + + @Parameterized.Parameters(name = "numFileAsyncThreads-{0}") + public static List data() { + return Arrays.asList(new String[][] {{"20"}, {"0"}}); + } + + private static void setupCluster(String numFsAsyncThreads) throws Exception { Configuration conf = getStateStoreConfiguration(StateStoreFileImpl.class); + conf.setInt(FEDERATION_STORE_FILE_ASYNC_THREADS, Integer.parseInt(numFsAsyncThreads)); getStateStore(conf); } @Before - public void startup() throws IOException { + public void startup() throws Exception { + setupCluster(numFileAsyncThreads); removeAll(getStateStoreDriver()); } + @After + public void tearDown() throws Exception { + tearDownCluster(); + } + @Test public void testInsert() throws IllegalArgumentException, IllegalAccessException, IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFileSystem.java index 8c06e6b8ed..4d383ae63f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFileSystem.java @@ -19,6 +19,8 @@ import java.io.BufferedWriter; import java.io.IOException; +import java.util.Arrays; +import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.MiniDFSCluster; @@ -26,12 +28,15 @@ import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileBaseImpl; import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileSystemImpl; import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState; -import org.junit.AfterClass; + +import org.junit.After; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.mockito.stubbing.Answer; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.FEDERATION_STORE_FS_ASYNC_THREADS; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; @@ -41,16 +46,22 @@ /** * Test the FileSystem (e.g., HDFS) implementation of the State Store driver. */ +@RunWith(Parameterized.class) public class TestStateStoreFileSystem extends TestStateStoreDriverBase { private static MiniDFSCluster dfsCluster; - @BeforeClass - public static void setupCluster() throws Exception { - Configuration conf = FederationStateStoreTestUtils - .getStateStoreConfiguration(StateStoreFileSystemImpl.class); - conf.set(StateStoreFileSystemImpl.FEDERATION_STORE_FS_PATH, - "/hdfs-federation/"); + private final String numFsAsyncThreads; + + public TestStateStoreFileSystem(String numFsAsyncThreads) { + this.numFsAsyncThreads = numFsAsyncThreads; + } + + private static void setupCluster(String numFsAsyncThreads) throws Exception { + Configuration conf = + FederationStateStoreTestUtils.getStateStoreConfiguration(StateStoreFileSystemImpl.class); + conf.set(StateStoreFileSystemImpl.FEDERATION_STORE_FS_PATH, "/hdfs-federation/"); + conf.setInt(FEDERATION_STORE_FS_ASYNC_THREADS, Integer.parseInt(numFsAsyncThreads)); // Create HDFS cluster to back the state tore MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf); @@ -60,18 +71,26 @@ public static void setupCluster() throws Exception { getStateStore(conf); } - @AfterClass - public static void tearDownCluster() { - if (dfsCluster != null) { - dfsCluster.shutdown(); - } + @Parameterized.Parameters(name = "numFsAsyncThreads-{0}") + public static List data() { + return Arrays.asList(new String[][] {{"20"}, {"0"}}); } @Before - public void startup() throws IOException { + public void startup() throws Exception { + setupCluster(numFsAsyncThreads); removeAll(getStateStoreDriver()); } + @After + public void tearDown() throws Exception { + tearDownCluster(); + if (dfsCluster != null) { + dfsCluster.shutdown(); + dfsCluster = null; + } + } + @Test public void testInsert() throws IllegalArgumentException, IllegalAccessException, IOException {