diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java index 59da614535..3a2995eba2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -172,7 +173,7 @@ private boolean isUpdateTime() { */ public void overrideExpiredRecords(QueryResult query) throws IOException { List commitRecords = new ArrayList<>(); - List deleteRecords = new ArrayList<>(); + List toDeleteRecords = new ArrayList<>(); List newRecords = query.getRecords(); long currentDriverTime = query.getTimestamp(); if (newRecords == null || currentDriverTime <= 0) { @@ -182,13 +183,8 @@ public void overrideExpiredRecords(QueryResult query) throws IOException { for (R record : newRecords) { if (record.shouldBeDeleted(currentDriverTime)) { String recordName = StateStoreUtils.getRecordName(record.getClass()); - if (getDriver().remove(record)) { - deleteRecords.add(record); - LOG.info("Deleted State Store record {}: {}", recordName, record); - } else { - LOG.warn("Couldn't delete State Store record {}: {}", recordName, - record); - } + LOG.info("State Store record to delete {}: {}", recordName, record); + toDeleteRecords.add(record); } else if (!record.isExpired() && record.checkExpired(currentDriverTime)) { String recordName = StateStoreUtils.getRecordName(record.getClass()); LOG.info("Override State Store record {}: {}", recordName, record); @@ -198,8 +194,12 @@ public void overrideExpiredRecords(QueryResult query) throws IOException { if (commitRecords.size() > 0) { getDriver().putAll(commitRecords, true, false); } - if (deleteRecords.size() > 0) { - newRecords.removeAll(deleteRecords); + if (!toDeleteRecords.isEmpty()) { + for (Map.Entry entry : getDriver().removeMultiple(toDeleteRecords).entrySet()) { + if (entry.getValue()) { + newRecords.remove(entry.getKey()); + } + } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreRecordOperations.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreRecordOperations.java index 716f41daf4..97f6c680a4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreRecordOperations.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreRecordOperations.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.util.List; +import java.util.Map; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -127,6 +128,17 @@ StateStoreOperationResult putAll( @AtMostOnce boolean remove(T record) throws IOException; + /** + * Remove multiple records. + * + * @param Record class of the records. + * @param records Records to be removed. + * @return Map of record to a boolean indicating if the record has being removed successfully. + * @throws IOException Throws exception if unable to query the data store. + */ + @AtMostOnce + Map removeMultiple(List records) throws IOException; + /** * Remove all records of this class from the store. * @@ -152,4 +164,17 @@ StateStoreOperationResult putAll( int remove(Class clazz, Query query) throws IOException; + /** + * Remove all records of a specific class that match any query in a list of queries. + * Requires the getAll implementation to fetch fresh records on each call. + * + * @param clazz The class to match the records with. + * @param queries Queries (logical OR) to filter what to remove. + * @param Record class of the records. + * @return Map of query to number of records removed by that query. + * @throws IOException Throws exception if unable to query the data store. + */ + @AtMostOnce + Map, Integer> remove(Class clazz, List> queries) + throws IOException; } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreBaseImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreBaseImpl.java index df3ce21dee..93ad279e18 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreBaseImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreBaseImpl.java @@ -21,7 +21,10 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -86,4 +89,37 @@ public boolean remove(T record) throws IOException { Class recordClass = (Class)StateStoreUtils.getRecordClass(clazz); return remove(recordClass, query) == 1; } + + @Override + public Map removeMultiple(List records) throws IOException { + assert !records.isEmpty(); + // Fall back to iterative remove() calls if all records don't share 1 class + Class expectedClazz = records.get(0).getClass(); + if (!records.stream().allMatch(x -> x.getClass() == expectedClazz)) { + Map result = new HashMap<>(); + for (T record : records) { + result.put(record, remove(record)); + } + return result; + } + + final List> queries = new ArrayList<>(); + for (T record : records) { + queries.add(new Query<>(record)); + } + @SuppressWarnings("unchecked") + Class recordClass = (Class) StateStoreUtils.getRecordClass(expectedClazz); + Map, Integer> result = remove(recordClass, queries); + return result.entrySet().stream() + .collect(Collectors.toMap(e -> e.getKey().getPartial(), e -> e.getValue() == 1)); + } + + public Map, Integer> remove(Class clazz, + List> queries) throws IOException { + Map, Integer> result = new HashMap<>(); + for (Query query : queries) { + result.put(query, remove(clazz, query)); + } + return result; + } } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java index 19a23cb022..0e72cf4175 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java @@ -25,7 +25,11 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; @@ -284,38 +288,47 @@ public StateStoreOperationResult putAll( } @Override - public int remove( - Class clazz, Query query) throws IOException { + public Map, Integer> remove(Class clazz, + List> queries) throws IOException { verifyDriverReady(); - if (query == null) { - return 0; + // Track how many entries are deleted by each query + Map, Integer> ret = new HashMap<>(); + final List trueRemoved = Collections.synchronizedList(new ArrayList<>()); + if (queries.isEmpty()) { + return ret; } // Read the current data long start = monotonicNow(); - List records = null; + List records; try { QueryResult result = get(clazz); records = result.getRecords(); } catch (IOException ex) { LOG.error("Cannot get existing records", ex); getMetrics().addFailure(monotonicNow() - start); - return 0; + return ret; } // Check the records to remove String znode = getZNodeForClass(clazz); - List recordsToRemove = filterMultiple(query, records); + Set recordsToRemove = new HashSet<>(); + Map, List> queryToRecords = new HashMap<>(); + for (Query query : queries) { + List filtered = filterMultiple(query, records); + queryToRecords.put(query, filtered); + recordsToRemove.addAll(filtered); + } // Remove the records - int removed = 0; - for (T existingRecord : recordsToRemove) { + List> callables = new ArrayList<>(); + recordsToRemove.forEach(existingRecord -> callables.add(() -> { LOG.info("Removing \"{}\"", existingRecord); try { String primaryKey = getPrimaryKey(existingRecord); String path = getNodePath(znode, primaryKey); if (zkManager.delete(path)) { - removed++; + trueRemoved.add(existingRecord); } else { LOG.error("Did not remove \"{}\"", existingRecord); } @@ -323,12 +336,38 @@ public int remove( LOG.error("Cannot remove \"{}\"", existingRecord, e); getMetrics().addFailure(monotonicNow() - start); } + return null; + })); + try { + if (enableConcurrent) { + executorService.invokeAll(callables); + } else { + for (Callable callable : callables) { + callable.call(); + } + } + } catch (Exception e) { + LOG.error("Record removal failed : {}", e.getMessage(), e); } long end = monotonicNow(); - if (removed > 0) { + if (!trueRemoved.isEmpty()) { getMetrics().addRemove(end - start); } - return removed; + // Generate return map + for (Map.Entry, List> entry : queryToRecords.entrySet()) { + for (T record : entry.getValue()) { + if (trueRemoved.contains(record)) { + ret.compute(entry.getKey(), (k, v) -> (v == null) ? 1 : v + 1); + } + } + } + return ret; + } + + @Override + public int remove(Class clazz, Query query) + throws IOException { + return remove(clazz, Collections.singletonList(query)).get(query); } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreZK.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreZK.java index f94e415b4d..5ddf93e05b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreZK.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreZK.java @@ -140,17 +140,28 @@ public void testAsyncPerformance() throws Exception { insertList.add(newRecord); } // Insert Multiple on sync mode - long startSync = Time.now(); + long startSyncPut = Time.now(); stateStoreDriver.putAll(insertList, true, false); - long endSync = Time.now(); + long endSyncPut = Time.now(); + // Removing 1000 records synchronously is painfully slow so test with only 5 records + // Then remove the rest with removeAll() + long startSyncRemove = Time.now(); + for (MountTable entry : insertList.subList(0, 5)) { + stateStoreDriver.remove(entry); + } + long endSyncRemove = Time.now(); stateStoreDriver.removeAll(MembershipState.class); stateStoreDriver.setEnableConcurrent(true); // Insert Multiple on async mode - long startAsync = Time.now(); + long startAsyncPut = Time.now(); stateStoreDriver.putAll(insertList, true, false); - long endAsync = Time.now(); - assertTrue((endSync - startSync) > (endAsync - startAsync)); + long endAsyncPut = Time.now(); + long startAsyncRemove = Time.now(); + stateStoreDriver.removeMultiple(insertList.subList(0, 5)); + long endAsyncRemove = Time.now(); + assertTrue((endSyncPut - startSyncPut) > (endAsyncPut - startAsyncPut)); + assertTrue((endSyncRemove - startSyncRemove) > (endAsyncRemove - startAsyncRemove)); } @Test