diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java index f90bdea9e2..8f5dc6a273 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java @@ -54,6 +54,7 @@ import java.util.Map; import java.util.Timer; import java.util.TimerTask; +import java.util.function.Consumer; import static org.fusesource.leveldbjni.JniDBFactory.bytes; @@ -70,23 +71,23 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore { private static final String VERSION_KEY = "version"; private static final String CONF_VERSION_NAME = "conf-version-store"; private static final String CONF_VERSION_KEY = "conf-version"; - private DB db; - private DB versiondb; + private DB versionDb; private long maxLogs; private Configuration conf; + private Configuration initSchedConf; @VisibleForTesting protected static final Version CURRENT_VERSION_INFO = Version .newInstance(0, 1); - private Timer compactionTimer; private long compactionIntervalMsec; @Override public void initialize(Configuration config, Configuration schedConf, RMContext rmContext) throws IOException { this.conf = config; + this.initSchedConf = schedConf; try { - initDatabase(schedConf); + initDatabase(); this.maxLogs = config.getLong( YarnConfiguration.RM_SCHEDCONF_MAX_LOGS, YarnConfiguration.DEFAULT_RM_SCHEDCONF_LEVELDB_MAX_LOGS); @@ -107,7 +108,15 @@ public void format() throws Exception { fs.delete(getStorageDir(DB_NAME), true); } - private void initDatabase(Configuration config) throws Exception { + private void initDatabase() throws Exception { + Path confVersion = createStorageDir(CONF_VERSION_NAME); + Options confOptions = new Options(); + confOptions.createIfMissing(false); + File confVersionFile = new File(confVersion.toString()); + + versionDb = initDatabaseHelper(confVersionFile, confOptions, + this::initVersionDb); + Path storeRoot = createStorageDir(DB_NAME); Options options = new Options(); options.createIfMissing(false); @@ -143,49 +152,37 @@ public byte[] findShortSuccessor(byte[] key) { return key; } }); + LOG.info("Using conf database at {}", storeRoot); + File dbFile = new File(storeRoot.toString()); + db = initDatabaseHelper(dbFile, options, this::initDb); + } - Path confVersion = createStorageDir(CONF_VERSION_NAME); - Options confOptions = new Options(); - confOptions.createIfMissing(false); - LOG.info("Using conf version at " + confVersion); - File confVersionFile = new File(confVersion.toString()); - try { - versiondb = JniDBFactory.factory.open(confVersionFile, confOptions); - } catch (NativeDB.DBException e) { - if (e.isNotFound() || e.getMessage().contains(" does not exist ")) { - LOG.info("Creating conf version at " + confVersionFile); - confOptions.createIfMissing(true); - try { - versiondb = JniDBFactory.factory.open(confVersionFile, confOptions); - versiondb.put(bytes(CONF_VERSION_KEY), bytes(String.valueOf(0))); - } catch (DBException dbErr) { - throw new IOException(dbErr.getMessage(), dbErr); - } - } else { - throw e; - } + private void initVersionDb(DB database) { + database.put(bytes(CONF_VERSION_KEY), bytes(String.valueOf(0))); + } + + private void initDb(DB database) { + WriteBatch initBatch = database.createWriteBatch(); + for (Map.Entry kv : initSchedConf) { + initBatch.put(bytes(kv.getKey()), bytes(kv.getValue())); } + database.write(initBatch); + increaseConfigVersion(); + } - - LOG.info("Using conf database at " + storeRoot); - File dbfile = new File(storeRoot.toString()); + private DB initDatabaseHelper(File configurationFile, Options options, + Consumer initMethod) throws Exception { + DB database; try { - db = JniDBFactory.factory.open(dbfile, options); + database = JniDBFactory.factory.open(configurationFile, options); } catch (NativeDB.DBException e) { if (e.isNotFound() || e.getMessage().contains(" does not exist ")) { - LOG.info("Creating conf database at " + dbfile); + LOG.info("Creating configuration version/database at {}", + configurationFile); options.createIfMissing(true); try { - db = JniDBFactory.factory.open(dbfile, options); - // Write the initial scheduler configuration - WriteBatch initBatch = db.createWriteBatch(); - for (Map.Entry kv : config) { - initBatch.put(bytes(kv.getKey()), bytes(kv.getValue())); - } - db.write(initBatch); - long configVersion = getConfigVersion() + 1L; - versiondb.put(bytes(CONF_VERSION_KEY), - bytes(String.valueOf(configVersion))); + database = JniDBFactory.factory.open(configurationFile, options); + initMethod.accept(database); } catch (DBException dbErr) { throw new IOException(dbErr.getMessage(), dbErr); } @@ -193,6 +190,8 @@ public byte[] findShortSuccessor(byte[] key) { throw e; } } + + return database; } private Path createStorageDir(String storageName) throws IOException { @@ -216,8 +215,8 @@ public void close() throws IOException { if (db != null) { db.close(); } - if (versiondb != null) { - versiondb.close(); + if (versionDb != null) { + versionDb.close(); } } @@ -235,7 +234,7 @@ public void logMutation(LogMutation logMutation) throws IOException { @Override public void confirmMutation(LogMutation pendingMutation, - boolean isValid) throws IOException { + boolean isValid) { WriteBatch updateBatch = db.createWriteBatch(); if (isValid) { for (Map.Entry changes : @@ -246,9 +245,7 @@ public void confirmMutation(LogMutation pendingMutation, updateBatch.put(bytes(changes.getKey()), bytes(changes.getValue())); } } - long configVersion = getConfigVersion() + 1L; - versiondb.put(bytes(CONF_VERSION_KEY), - bytes(String.valueOf(configVersion))); + increaseConfigVersion(); } db.write(updateBatch); } @@ -263,11 +260,16 @@ private byte[] serLogMutations(LinkedList mutations) throws } } + // Because of type erasure casting to LinkedList will be + // unchecked. A way around that would be to iterate over the logMutations + // which is overkill in this case. + @SuppressWarnings("unchecked") private LinkedList deserLogMutations(byte[] mutations) throws IOException { if (mutations == null) { return new LinkedList<>(); } + try (ObjectInput input = new ObjectInputStream( new ByteArrayInputStream(mutations))) { return (LinkedList) input.readObject(); @@ -293,9 +295,15 @@ public synchronized Configuration retrieve() { return config; } + private void increaseConfigVersion() { + long configVersion = getConfigVersion() + 1L; + versionDb.put(bytes(CONF_VERSION_KEY), + bytes(String.valueOf(configVersion))); + } + @Override public long getConfigVersion() { - String version = new String(versiondb.get(bytes(CONF_VERSION_KEY)), + String version = new String(versionDb.get(bytes(CONF_VERSION_KEY)), StandardCharsets.UTF_8); return Long.parseLong(version); } @@ -305,18 +313,15 @@ public List getConfirmedConfHistory(long fromId) { return null; // unimplemented } - // TODO below was taken from LeveldbRMStateStore, it can probably be - // refactored private void startCompactionTimer() { if (compactionIntervalMsec > 0) { - compactionTimer = new Timer( + Timer compactionTimer = new Timer( this.getClass().getSimpleName() + " compaction timer", true); compactionTimer.schedule(new CompactionTimerTask(), compactionIntervalMsec, compactionIntervalMsec); } } - // TODO: following is taken from LeveldbRMStateStore @Override public Version getConfStoreVersion() throws Exception { Version version = null; @@ -375,7 +380,7 @@ public void run() { LOG.error("Error compacting database", e); } long duration = Time.monotonicNow() - start; - LOG.info("Full compaction cycle completed in " + duration + " msec"); + LOG.info("Full compaction cycle completed in {} msec", duration); } } }