HDFS-17041. RBF: Fix putAll impl for mysql and file based state stores (#5723)
This commit is contained in:
parent
3fbadc5d50
commit
f0c4286e3e
@ -386,6 +386,9 @@ public <T extends BaseRecord> StateStoreOperationResult putAll(
|
||||
|
||||
// Check if any record exists
|
||||
Map<String, T> toWrite = new HashMap<>();
|
||||
final List<String> failedRecordsKeys = Collections.synchronizedList(new ArrayList<>());
|
||||
final AtomicBoolean success = new AtomicBoolean(true);
|
||||
|
||||
for (T record : records) {
|
||||
Class<? extends BaseRecord> recordClass = record.getClass();
|
||||
String path = getPathForClass(recordClass);
|
||||
@ -400,10 +403,8 @@ public <T extends BaseRecord> StateStoreOperationResult putAll(
|
||||
toWrite.put(recordPath, record);
|
||||
} else if (errorIfExists) {
|
||||
LOG.error("Attempt to insert record {} that already exists", recordPath);
|
||||
if (metrics != null) {
|
||||
metrics.addFailure(monotonicNow() - start);
|
||||
}
|
||||
return new StateStoreOperationResult(getOriginalPrimaryKey(primaryKey));
|
||||
failedRecordsKeys.add(getOriginalPrimaryKey(primaryKey));
|
||||
success.set(false);
|
||||
} else {
|
||||
LOG.debug("Not updating {}", record);
|
||||
}
|
||||
@ -413,9 +414,7 @@ public <T extends BaseRecord> StateStoreOperationResult putAll(
|
||||
}
|
||||
|
||||
// Write the records
|
||||
final AtomicBoolean success = new AtomicBoolean(true);
|
||||
final List<Callable<Void>> callables = new ArrayList<>();
|
||||
final List<String> failedRecordsKeys = Collections.synchronizedList(new ArrayList<>());
|
||||
toWrite.entrySet().forEach(
|
||||
entry -> callables.add(() -> writeRecordToFile(success, entry, failedRecordsKeys)));
|
||||
if (this.concurrentStoreAccessPool != null) {
|
||||
|
@ -194,10 +194,8 @@ public <T extends BaseRecord> StateStoreOperationResult putAll(
|
||||
if (errorIfExists) {
|
||||
LOG.error("Attempted to insert record {} that already exists "
|
||||
+ "in table {} and updates are disallowed.", primaryKey, tableName);
|
||||
if (metrics != null) {
|
||||
metrics.addFailure(Time.monotonicNow() - start);
|
||||
}
|
||||
return new StateStoreOperationResult(getOriginalPrimaryKey(primaryKey));
|
||||
failedRecordsKeys.add(getOriginalPrimaryKey(primaryKey));
|
||||
success = false;
|
||||
} else {
|
||||
LOG.debug("Not updating {} as updates are not allowed", record);
|
||||
}
|
||||
|
@ -32,6 +32,7 @@
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Objects;
|
||||
import java.util.Random;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
@ -316,6 +317,15 @@ public <T extends BaseRecord> void testPut(
|
||||
assertEquals(1, result2.getFailedRecordsKeys().size());
|
||||
assertEquals(insertList.get(0).getPrimaryKey(), result2.getFailedRecordsKeys().get(0));
|
||||
|
||||
StateStoreOperationResult result3 = driver.putAll(insertList.subList(0, 2), false, true);
|
||||
assertFalse(result3.isOperationSuccessful());
|
||||
assertEquals(2, result3.getFailedRecordsKeys().size());
|
||||
assertTrue(insertList.stream()
|
||||
.anyMatch(t -> Objects.equals(result3.getFailedRecordsKeys().get(0), t.getPrimaryKey())));
|
||||
assertTrue(insertList.stream()
|
||||
.anyMatch(t -> Objects.equals(result3.getFailedRecordsKeys().get(1), t.getPrimaryKey())));
|
||||
|
||||
|
||||
records = driver.get(clazz);
|
||||
assertEquals(records.getRecords().size(), 10);
|
||||
|
||||
|
@ -32,7 +32,7 @@
|
||||
import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.*;
|
||||
|
||||
/**
|
||||
* Test the FileSystem (e.g., HDFS) implementation of the State Store driver.
|
||||
* Test the MySQL implementation of the State Store driver.
|
||||
*/
|
||||
public class TestStateStoreMySQL extends TestStateStoreDriverBase {
|
||||
private static final String CONNECTION_URL = "jdbc:derby:memory:StateStore";
|
||||
|
Loading…
Reference in New Issue
Block a user