YARN-9998. Code cleanup in LeveldbConfigurationStore. Contributed by Benjamin Teke

This commit is contained in:
Szilard Nemeth 2020-03-26 12:18:15 +01:00
parent 0fa7bf47df
commit 348685dcb9

View File

@ -54,6 +54,7 @@
import java.util.Map; import java.util.Map;
import java.util.Timer; import java.util.Timer;
import java.util.TimerTask; import java.util.TimerTask;
import java.util.function.Consumer;
import static org.fusesource.leveldbjni.JniDBFactory.bytes; import static org.fusesource.leveldbjni.JniDBFactory.bytes;
@ -70,23 +71,23 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
private static final String VERSION_KEY = "version"; private static final String VERSION_KEY = "version";
private static final String CONF_VERSION_NAME = "conf-version-store"; private static final String CONF_VERSION_NAME = "conf-version-store";
private static final String CONF_VERSION_KEY = "conf-version"; private static final String CONF_VERSION_KEY = "conf-version";
private DB db; private DB db;
private DB versiondb; private DB versionDb;
private long maxLogs; private long maxLogs;
private Configuration conf; private Configuration conf;
private Configuration initSchedConf;
@VisibleForTesting @VisibleForTesting
protected static final Version CURRENT_VERSION_INFO = Version protected static final Version CURRENT_VERSION_INFO = Version
.newInstance(0, 1); .newInstance(0, 1);
private Timer compactionTimer;
private long compactionIntervalMsec; private long compactionIntervalMsec;
@Override @Override
public void initialize(Configuration config, Configuration schedConf, public void initialize(Configuration config, Configuration schedConf,
RMContext rmContext) throws IOException { RMContext rmContext) throws IOException {
this.conf = config; this.conf = config;
this.initSchedConf = schedConf;
try { try {
initDatabase(schedConf); initDatabase();
this.maxLogs = config.getLong( this.maxLogs = config.getLong(
YarnConfiguration.RM_SCHEDCONF_MAX_LOGS, YarnConfiguration.RM_SCHEDCONF_MAX_LOGS,
YarnConfiguration.DEFAULT_RM_SCHEDCONF_LEVELDB_MAX_LOGS); YarnConfiguration.DEFAULT_RM_SCHEDCONF_LEVELDB_MAX_LOGS);
@ -107,7 +108,15 @@ public void format() throws Exception {
fs.delete(getStorageDir(DB_NAME), true); fs.delete(getStorageDir(DB_NAME), true);
} }
private void initDatabase(Configuration config) throws Exception { private void initDatabase() throws Exception {
Path confVersion = createStorageDir(CONF_VERSION_NAME);
Options confOptions = new Options();
confOptions.createIfMissing(false);
File confVersionFile = new File(confVersion.toString());
versionDb = initDatabaseHelper(confVersionFile, confOptions,
this::initVersionDb);
Path storeRoot = createStorageDir(DB_NAME); Path storeRoot = createStorageDir(DB_NAME);
Options options = new Options(); Options options = new Options();
options.createIfMissing(false); options.createIfMissing(false);
@ -143,49 +152,37 @@ public byte[] findShortSuccessor(byte[] key) {
return key; return key;
} }
}); });
LOG.info("Using conf database at {}", storeRoot);
Path confVersion = createStorageDir(CONF_VERSION_NAME); File dbFile = new File(storeRoot.toString());
Options confOptions = new Options(); db = initDatabaseHelper(dbFile, options, this::initDb);
confOptions.createIfMissing(false);
LOG.info("Using conf version at " + confVersion);
File confVersionFile = new File(confVersion.toString());
try {
versiondb = JniDBFactory.factory.open(confVersionFile, confOptions);
} catch (NativeDB.DBException e) {
if (e.isNotFound() || e.getMessage().contains(" does not exist ")) {
LOG.info("Creating conf version at " + confVersionFile);
confOptions.createIfMissing(true);
try {
versiondb = JniDBFactory.factory.open(confVersionFile, confOptions);
versiondb.put(bytes(CONF_VERSION_KEY), bytes(String.valueOf(0)));
} catch (DBException dbErr) {
throw new IOException(dbErr.getMessage(), dbErr);
}
} else {
throw e;
}
} }
private void initVersionDb(DB database) {
database.put(bytes(CONF_VERSION_KEY), bytes(String.valueOf(0)));
}
LOG.info("Using conf database at " + storeRoot); private void initDb(DB database) {
File dbfile = new File(storeRoot.toString()); WriteBatch initBatch = database.createWriteBatch();
try { for (Map.Entry<String, String> kv : initSchedConf) {
db = JniDBFactory.factory.open(dbfile, options);
} catch (NativeDB.DBException e) {
if (e.isNotFound() || e.getMessage().contains(" does not exist ")) {
LOG.info("Creating conf database at " + dbfile);
options.createIfMissing(true);
try {
db = JniDBFactory.factory.open(dbfile, options);
// Write the initial scheduler configuration
WriteBatch initBatch = db.createWriteBatch();
for (Map.Entry<String, String> kv : config) {
initBatch.put(bytes(kv.getKey()), bytes(kv.getValue())); initBatch.put(bytes(kv.getKey()), bytes(kv.getValue()));
} }
db.write(initBatch); database.write(initBatch);
long configVersion = getConfigVersion() + 1L; increaseConfigVersion();
versiondb.put(bytes(CONF_VERSION_KEY), }
bytes(String.valueOf(configVersion)));
private DB initDatabaseHelper(File configurationFile, Options options,
Consumer<DB> initMethod) throws Exception {
DB database;
try {
database = JniDBFactory.factory.open(configurationFile, options);
} catch (NativeDB.DBException e) {
if (e.isNotFound() || e.getMessage().contains(" does not exist ")) {
LOG.info("Creating configuration version/database at {}",
configurationFile);
options.createIfMissing(true);
try {
database = JniDBFactory.factory.open(configurationFile, options);
initMethod.accept(database);
} catch (DBException dbErr) { } catch (DBException dbErr) {
throw new IOException(dbErr.getMessage(), dbErr); throw new IOException(dbErr.getMessage(), dbErr);
} }
@ -193,6 +190,8 @@ public byte[] findShortSuccessor(byte[] key) {
throw e; throw e;
} }
} }
return database;
} }
private Path createStorageDir(String storageName) throws IOException { private Path createStorageDir(String storageName) throws IOException {
@ -216,8 +215,8 @@ public void close() throws IOException {
if (db != null) { if (db != null) {
db.close(); db.close();
} }
if (versiondb != null) { if (versionDb != null) {
versiondb.close(); versionDb.close();
} }
} }
@ -235,7 +234,7 @@ public void logMutation(LogMutation logMutation) throws IOException {
@Override @Override
public void confirmMutation(LogMutation pendingMutation, public void confirmMutation(LogMutation pendingMutation,
boolean isValid) throws IOException { boolean isValid) {
WriteBatch updateBatch = db.createWriteBatch(); WriteBatch updateBatch = db.createWriteBatch();
if (isValid) { if (isValid) {
for (Map.Entry<String, String> changes : for (Map.Entry<String, String> changes :
@ -246,9 +245,7 @@ public void confirmMutation(LogMutation pendingMutation,
updateBatch.put(bytes(changes.getKey()), bytes(changes.getValue())); updateBatch.put(bytes(changes.getKey()), bytes(changes.getValue()));
} }
} }
long configVersion = getConfigVersion() + 1L; increaseConfigVersion();
versiondb.put(bytes(CONF_VERSION_KEY),
bytes(String.valueOf(configVersion)));
} }
db.write(updateBatch); db.write(updateBatch);
} }
@ -263,11 +260,16 @@ private byte[] serLogMutations(LinkedList<LogMutation> mutations) throws
} }
} }
// Because of type erasure casting to LinkedList<LogMutation> will be
// unchecked. A way around that would be to iterate over the logMutations
// which is overkill in this case.
@SuppressWarnings("unchecked")
private LinkedList<LogMutation> deserLogMutations(byte[] mutations) throws private LinkedList<LogMutation> deserLogMutations(byte[] mutations) throws
IOException { IOException {
if (mutations == null) { if (mutations == null) {
return new LinkedList<>(); return new LinkedList<>();
} }
try (ObjectInput input = new ObjectInputStream( try (ObjectInput input = new ObjectInputStream(
new ByteArrayInputStream(mutations))) { new ByteArrayInputStream(mutations))) {
return (LinkedList<LogMutation>) input.readObject(); return (LinkedList<LogMutation>) input.readObject();
@ -293,9 +295,15 @@ public synchronized Configuration retrieve() {
return config; return config;
} }
private void increaseConfigVersion() {
long configVersion = getConfigVersion() + 1L;
versionDb.put(bytes(CONF_VERSION_KEY),
bytes(String.valueOf(configVersion)));
}
@Override @Override
public long getConfigVersion() { public long getConfigVersion() {
String version = new String(versiondb.get(bytes(CONF_VERSION_KEY)), String version = new String(versionDb.get(bytes(CONF_VERSION_KEY)),
StandardCharsets.UTF_8); StandardCharsets.UTF_8);
return Long.parseLong(version); return Long.parseLong(version);
} }
@ -305,18 +313,15 @@ public List<LogMutation> getConfirmedConfHistory(long fromId) {
return null; // unimplemented return null; // unimplemented
} }
// TODO below was taken from LeveldbRMStateStore, it can probably be
// refactored
private void startCompactionTimer() { private void startCompactionTimer() {
if (compactionIntervalMsec > 0) { if (compactionIntervalMsec > 0) {
compactionTimer = new Timer( Timer compactionTimer = new Timer(
this.getClass().getSimpleName() + " compaction timer", true); this.getClass().getSimpleName() + " compaction timer", true);
compactionTimer.schedule(new CompactionTimerTask(), compactionTimer.schedule(new CompactionTimerTask(),
compactionIntervalMsec, compactionIntervalMsec); compactionIntervalMsec, compactionIntervalMsec);
} }
} }
// TODO: following is taken from LeveldbRMStateStore
@Override @Override
public Version getConfStoreVersion() throws Exception { public Version getConfStoreVersion() throws Exception {
Version version = null; Version version = null;
@ -375,7 +380,7 @@ public void run() {
LOG.error("Error compacting database", e); LOG.error("Error compacting database", e);
} }
long duration = Time.monotonicNow() - start; long duration = Time.monotonicNow() - start;
LOG.info("Full compaction cycle completed in " + duration + " msec"); LOG.info("Full compaction cycle completed in {} msec", duration);
} }
} }
} }