From 76900b4f5b7ad8d11baf505abc43f7b24d364071 Mon Sep 17 00:00:00 2001 From: Szilard Nemeth Date: Sat, 18 Apr 2020 09:57:27 +0200 Subject: [PATCH] YARN-10189. Code cleanup in LeveldbRMStateStore. Contributed by Benjamin Teke --- .../server/resourcemanager/DBManager.java | 131 +++++++++++++ .../recovery/LeveldbRMStateStore.java | 176 ++++-------------- .../conf/LeveldbConfigurationStore.java | 105 ++--------- .../recovery/TestLeveldbRMStateStore.java | 21 ++- 4 files changed, 197 insertions(+), 236 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DBManager.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DBManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DBManager.java new file mode 100644 index 0000000000..13529baebe --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DBManager.java @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.util.Time; +import org.apache.hadoop.yarn.proto.YarnServerCommonProtos; +import org.apache.hadoop.yarn.server.records.Version; +import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl; +import org.fusesource.leveldbjni.JniDBFactory; +import org.fusesource.leveldbjni.internal.NativeDB; +import org.iq80.leveldb.DB; +import org.iq80.leveldb.DBException; +import org.iq80.leveldb.Options; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.util.Timer; +import java.util.TimerTask; +import java.util.function.Consumer; + +import static org.fusesource.leveldbjni.JniDBFactory.bytes; + +public class DBManager implements Closeable { + public static final Logger LOG = + LoggerFactory.getLogger(DBManager.class); + private DB db; + private Timer compactionTimer; + + public DB initDatabase(File configurationFile, Options options, + Consumer initMethod) throws Exception { + try { + db = JniDBFactory.factory.open(configurationFile, options); + } catch (NativeDB.DBException e) { + if (e.isNotFound() || e.getMessage().contains(" does not exist ")) { + LOG.info("Creating configuration version/database at {}", + configurationFile); + options.createIfMissing(true); + try { + db = JniDBFactory.factory.open(configurationFile, options); + initMethod.accept(db); + } catch (DBException dbErr) { + throw new IOException(dbErr.getMessage(), dbErr); + } + } else { + throw e; + } + } + + return db; + } + + public void close() throws IOException { + if (compactionTimer != null) { + compactionTimer.cancel(); + compactionTimer = null; + } + if (db != null) { + db.close(); + db = null; + } + } + + public void storeVersion(String versionKey, Version versionValue) { + byte[] data = ((VersionPBImpl) versionValue).getProto().toByteArray(); + db.put(bytes(versionKey), data); + } + + public Version loadVersion(String versionKey) throws Exception { + Version version = null; + try { + byte[] data = db.get(bytes(versionKey)); + if (data != null) { + version = new VersionPBImpl(YarnServerCommonProtos.VersionProto + .parseFrom(data)); + } + } catch (DBException e) { + throw new IOException(e); + } + return version; + } + + @VisibleForTesting + public void setDb(DB db) { + this.db = db; + } + + public void startCompactionTimer(long compactionIntervalMsec, + String className) { + if (compactionIntervalMsec > 0) { + compactionTimer = new Timer( + className + " compaction timer", true); + compactionTimer.schedule(new CompactionTimerTask(), + compactionIntervalMsec, compactionIntervalMsec); + } + } + + private class CompactionTimerTask extends TimerTask { + @Override + public void run() { + long start = Time.monotonicNow(); + LOG.info("Starting full compaction cycle"); + try { + db.compactRange(null, null); + } catch (DBException e) { + LOG.error("Error compacting database", e); + } + long duration = Time.monotonicNow() - start; + LOG.info("Full compaction cycle completed in " + duration + " msec"); + } + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java index 2420735748..2f4d9befb9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java @@ -31,9 +31,8 @@ import java.security.PrivateKey; import java.security.cert.X509Certificate; import java.util.HashMap; import java.util.Map.Entry; -import java.util.Timer; -import java.util.TimerTask; +import org.apache.hadoop.yarn.server.resourcemanager.DBManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -42,13 +41,11 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.security.token.delegation.DelegationKey; -import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; -import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.AMRMTokenSecretManagerStateProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.EpochProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationAttemptStateDataProto; @@ -56,7 +53,6 @@ import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.Appl import org.apache.hadoop.yarn.proto.YarnProtos.ReservationAllocationStateProto; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.records.Version; -import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; @@ -66,8 +62,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.AM import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.server.utils.LeveldbIterator; -import org.fusesource.leveldbjni.JniDBFactory; -import org.fusesource.leveldbjni.internal.NativeDB; import org.iq80.leveldb.DB; import org.iq80.leveldb.DBException; import org.iq80.leveldb.Options; @@ -100,7 +94,7 @@ public class LeveldbRMStateStore extends RMStateStore { .newInstance(1, 1); private DB db; - private Timer compactionTimer; + private DBManager dbManager = new DBManager(); private long compactionIntervalMsec; private String getApplicationNodeKey(ApplicationId appId) { @@ -140,7 +134,7 @@ public class LeveldbRMStateStore extends RMStateStore { } @Override - protected void initInternal(Configuration conf) throws Exception { + protected void initInternal(Configuration conf) { compactionIntervalMsec = conf.getLong( YarnConfiguration.RM_LEVELDB_COMPACTION_INTERVAL_SECS, YarnConfiguration.DEFAULT_RM_LEVELDB_COMPACTION_INTERVAL_SECS) * 1000; @@ -165,55 +159,20 @@ public class LeveldbRMStateStore extends RMStateStore { @Override protected void startInternal() throws Exception { - db = openDatabase(); - startCompactionTimer(); - } - - protected DB openDatabase() throws Exception { Path storeRoot = createStorageDir(); Options options = new Options(); options.createIfMissing(false); LOG.info("Using state database at " + storeRoot + " for recovery"); File dbfile = new File(storeRoot.toString()); - try { - db = JniDBFactory.factory.open(dbfile, options); - } catch (NativeDB.DBException e) { - if (e.isNotFound() || e.getMessage().contains(" does not exist ")) { - LOG.info("Creating state database at " + dbfile); - options.createIfMissing(true); - try { - db = JniDBFactory.factory.open(dbfile, options); - // store version - storeVersion(); - } catch (DBException dbErr) { - throw new IOException(dbErr.getMessage(), dbErr); - } - } else { - throw e; - } - } - return db; - } - - private void startCompactionTimer() { - if (compactionIntervalMsec > 0) { - compactionTimer = new Timer( - this.getClass().getSimpleName() + " compaction timer", true); - compactionTimer.schedule(new CompactionTimerTask(), - compactionIntervalMsec, compactionIntervalMsec); - } + db = dbManager.initDatabase(dbfile, options, (database) -> + storeVersion(CURRENT_VERSION_INFO)); + dbManager.startCompactionTimer(compactionIntervalMsec, + this.getClass().getSimpleName()); } @Override protected void closeInternal() throws Exception { - if (compactionTimer != null) { - compactionTimer.cancel(); - compactionTimer = null; - } - if (db != null) { - db.close(); - db = null; - } + dbManager.close(); } @VisibleForTesting @@ -228,33 +187,22 @@ public class LeveldbRMStateStore extends RMStateStore { @Override protected Version loadVersion() throws Exception { - Version version = null; - try { - byte[] data = db.get(bytes(VERSION_NODE)); - if (data != null) { - version = new VersionPBImpl(VersionProto.parseFrom(data)); - } - } catch (DBException e) { - throw new IOException(e); - } - return version; + return dbManager.loadVersion(VERSION_NODE); } @Override protected void storeVersion() throws Exception { - dbStoreVersion(CURRENT_VERSION_INFO); - } - - void dbStoreVersion(Version state) throws IOException { - String key = VERSION_NODE; - byte[] data = ((VersionPBImpl) state).getProto().toByteArray(); try { - db.put(bytes(key), data); + storeVersion(CURRENT_VERSION_INFO); } catch (DBException e) { throw new IOException(e); } } + protected void storeVersion(Version version) { + dbManager.storeVersion(VERSION_NODE, version); + } + @Override protected Version getCurrentVersion() { return CURRENT_VERSION_INFO; @@ -290,9 +238,7 @@ public class LeveldbRMStateStore extends RMStateStore { private void loadReservationState(RMState rmState) throws IOException { int numReservations = 0; - LeveldbIterator iter = null; - try { - iter = new LeveldbIterator(db); + try (LeveldbIterator iter = new LeveldbIterator(db)) { iter.seek(bytes(RM_RESERVATION_KEY_PREFIX)); while (iter.hasNext()) { Entry entry = iter.next(); @@ -324,10 +270,6 @@ public class LeveldbRMStateStore extends RMStateStore { } } catch (DBException e) { throw new IOException(e); - } finally { - if (iter != null) { - iter.close(); - } } LOG.info("Recovered " + numReservations + " reservations"); } @@ -342,9 +284,7 @@ public class LeveldbRMStateStore extends RMStateStore { private int loadRMDTSecretManagerKeys(RMState state) throws IOException { int numKeys = 0; - LeveldbIterator iter = null; - try { - iter = new LeveldbIterator(db); + try (LeveldbIterator iter = new LeveldbIterator(db)) { iter.seek(bytes(RM_DT_MASTER_KEY_KEY_PREFIX)); while (iter.hasNext()) { Entry entry = iter.next(); @@ -361,10 +301,6 @@ public class LeveldbRMStateStore extends RMStateStore { } } catch (DBException e) { throw new IOException(e); - } finally { - if (iter != null) { - iter.close(); - } } return numKeys; } @@ -382,9 +318,7 @@ public class LeveldbRMStateStore extends RMStateStore { private int loadRMDTSecretManagerTokens(RMState state) throws IOException { int numTokens = 0; - LeveldbIterator iter = null; - try { - iter = new LeveldbIterator(db); + try (LeveldbIterator iter = new LeveldbIterator(db)) { iter.seek(bytes(RM_DT_TOKEN_KEY_PREFIX)); while (iter.hasNext()) { Entry entry = iter.next(); @@ -404,17 +338,13 @@ public class LeveldbRMStateStore extends RMStateStore { } } catch (DBException e) { throw new IOException(e); - } finally { - if (iter != null) { - iter.close(); - } } return numTokens; } private RMDelegationTokenIdentifierData loadDelegationToken(byte[] data) throws IOException { - RMDelegationTokenIdentifierData tokenData = null; + RMDelegationTokenIdentifierData tokenData; DataInputStream in = new DataInputStream(new ByteArrayInputStream(data)); try { tokenData = RMStateStoreUtils.readRMDelegationTokenIdentifierData(in); @@ -426,7 +356,7 @@ public class LeveldbRMStateStore extends RMStateStore { private void loadRMDTSecretManagerTokenSequenceNumber(RMState state) throws IOException { - byte[] data = null; + byte[] data; try { data = db.get(bytes(RM_DT_SEQUENCE_NUMBER_KEY)); } catch (DBException e) { @@ -445,9 +375,7 @@ public class LeveldbRMStateStore extends RMStateStore { private void loadRMApps(RMState state) throws IOException { int numApps = 0; int numAppAttempts = 0; - LeveldbIterator iter = null; - try { - iter = new LeveldbIterator(db); + try (LeveldbIterator iter = new LeveldbIterator(db)) { iter.seek(bytes(RM_APP_KEY_PREFIX)); while (iter.hasNext()) { Entry entry = iter.next(); @@ -467,10 +395,6 @@ public class LeveldbRMStateStore extends RMStateStore { } } catch (DBException e) { throw new IOException(e); - } finally { - if (iter != null) { - iter.close(); - } } LOG.info("Recovered " + numApps + " applications and " + numAppAttempts + " application attempts"); @@ -523,7 +447,7 @@ public class LeveldbRMStateStore extends RMStateStore { @VisibleForTesting ApplicationStateData loadRMAppState(ApplicationId appId) throws IOException { String appKey = getApplicationNodeKey(appId); - byte[] data = null; + byte[] data; try { data = db.get(bytes(appKey)); } catch (DBException e) { @@ -539,7 +463,7 @@ public class LeveldbRMStateStore extends RMStateStore { ApplicationAttemptStateData loadRMAppAttemptState( ApplicationAttemptId attemptId) throws IOException { String attemptKey = getApplicationAttemptNodeKey(attemptId); - byte[] data = null; + byte[] data; try { data = db.get(bytes(attemptKey)); } catch (DBException e) { @@ -668,8 +592,7 @@ public class LeveldbRMStateStore extends RMStateStore { appState.getApplicationSubmissionContext().getApplicationId(); String appKey = getApplicationNodeKey(appId); try { - WriteBatch batch = db.createWriteBatch(); - try { + try (WriteBatch batch = db.createWriteBatch()) { batch.delete(bytes(appKey)); for (ApplicationAttemptId attemptId : appState.attempts.keySet()) { String attemptKey = getApplicationAttemptNodeKey(appKey, attemptId); @@ -680,8 +603,6 @@ public class LeveldbRMStateStore extends RMStateStore { + appState.attempts.size() + " attempts" + " at " + appKey); } db.write(batch); - } finally { - batch.close(); } } catch (DBException e) { throw new IOException(e); @@ -693,16 +614,13 @@ public class LeveldbRMStateStore extends RMStateStore { ReservationAllocationStateProto reservationAllocation, String planName, String reservationIdName) throws Exception { try { - WriteBatch batch = db.createWriteBatch(); - try { + try (WriteBatch batch = db.createWriteBatch()) { String key = getReservationNodeKey(planName, reservationIdName); LOG.debug("Storing state for reservation {} plan {} at {}", reservationIdName, planName, key); batch.put(bytes(key), reservationAllocation.toByteArray()); db.write(batch); - } finally { - batch.close(); } } catch (DBException e) { throw new IOException(e); @@ -713,16 +631,13 @@ public class LeveldbRMStateStore extends RMStateStore { protected void removeReservationState(String planName, String reservationIdName) throws Exception { try { - WriteBatch batch = db.createWriteBatch(); - try { + try (WriteBatch batch = db.createWriteBatch()) { String reservationKey = getReservationNodeKey(planName, reservationIdName); batch.delete(bytes(reservationKey)); LOG.debug("Removing state for reservation {} plan {} at {}", reservationIdName, planName, reservationKey); db.write(batch); - } finally { - batch.close(); } } catch (DBException e) { throw new IOException(e); @@ -736,10 +651,9 @@ public class LeveldbRMStateStore extends RMStateStore { new RMDelegationTokenIdentifierData(tokenId, renewDate); LOG.debug("Storing token to {}", tokenKey); try { - WriteBatch batch = db.createWriteBatch(); - try { + try (WriteBatch batch = db.createWriteBatch()) { batch.put(bytes(tokenKey), tokenData.toByteArray()); - if(!isUpdate) { + if (!isUpdate) { ByteArrayOutputStream bs = new ByteArrayOutputStream(); try (DataOutputStream ds = new DataOutputStream(bs)) { ds.writeInt(tokenId.getSequenceNumber()); @@ -749,8 +663,6 @@ public class LeveldbRMStateStore extends RMStateStore { batch.put(bytes(RM_DT_SEQUENCE_NUMBER_KEY), bs.toByteArray()); } db.write(batch); - } finally { - batch.close(); } } catch (DBException e) { throw new IOException(e); @@ -789,11 +701,8 @@ public class LeveldbRMStateStore extends RMStateStore { String dbKey = getRMDTMasterKeyNodeKey(masterKey); LOG.debug("Storing token master key to {}", dbKey); ByteArrayOutputStream os = new ByteArrayOutputStream(); - DataOutputStream out = new DataOutputStream(os); - try { + try (DataOutputStream out = new DataOutputStream(os)) { masterKey.write(out); - } finally { - out.close(); } try { db.put(bytes(dbKey), os.toByteArray()); @@ -833,13 +742,10 @@ public class LeveldbRMStateStore extends RMStateStore { String caPrivateKeyKey = getProxyCAPrivateKeyNodeKey(); try { - WriteBatch batch = db.createWriteBatch(); - try { + try (WriteBatch batch = db.createWriteBatch()) { batch.put(bytes(caCertKey), caCertData); batch.put(bytes(caPrivateKeyKey), caPrivateKeyData); db.write(batch); - } finally { - batch.close(); } } catch (DBException e) { throw new IOException(e); @@ -871,9 +777,7 @@ public class LeveldbRMStateStore extends RMStateStore { @VisibleForTesting int getNumEntriesInDatabase() throws IOException { int numEntries = 0; - LeveldbIterator iter = null; - try { - iter = new LeveldbIterator(db); + try (LeveldbIterator iter = new LeveldbIterator(db)) { iter.seekToFirst(); while (iter.hasNext()) { Entry entry = iter.next(); @@ -882,26 +786,12 @@ public class LeveldbRMStateStore extends RMStateStore { } } catch (DBException e) { throw new IOException(e); - } finally { - if (iter != null) { - iter.close(); - } } return numEntries; } - private class CompactionTimerTask extends TimerTask { - @Override - public void run() { - long start = Time.monotonicNow(); - LOG.info("Starting full compaction cycle"); - try { - db.compactRange(null, null); - } catch (DBException e) { - LOG.error("Error compacting database", e); - } - long duration = Time.monotonicNow() - start; - LOG.info("Full compaction cycle completed in " + duration + " msec"); - } + @VisibleForTesting + protected void setDbManager(DBManager dbManager) { + this.dbManager = dbManager; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java index 8f5dc6a273..3fbdb3067a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java @@ -19,20 +19,16 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.yarn.server.resourcemanager.DBManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.proto.YarnServerCommonProtos; import org.apache.hadoop.yarn.server.records.Version; -import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; -import org.fusesource.leveldbjni.JniDBFactory; -import org.fusesource.leveldbjni.internal.NativeDB; import org.iq80.leveldb.DB; import org.iq80.leveldb.DBComparator; import org.iq80.leveldb.DBException; @@ -52,9 +48,6 @@ import java.nio.charset.StandardCharsets; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Timer; -import java.util.TimerTask; -import java.util.function.Consumer; import static org.fusesource.leveldbjni.JniDBFactory.bytes; @@ -72,6 +65,8 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore { private static final String CONF_VERSION_NAME = "conf-version-store"; private static final String CONF_VERSION_KEY = "conf-version"; private DB db; + private DBManager dbManager; + private DBManager versionDbManager; private DB versionDb; private long maxLogs; private Configuration conf; @@ -79,23 +74,25 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore { @VisibleForTesting protected static final Version CURRENT_VERSION_INFO = Version .newInstance(0, 1); - private long compactionIntervalMsec; @Override public void initialize(Configuration config, Configuration schedConf, RMContext rmContext) throws IOException { this.conf = config; this.initSchedConf = schedConf; + this.dbManager = new DBManager(); + this.versionDbManager = new DBManager(); try { initDatabase(); this.maxLogs = config.getLong( YarnConfiguration.RM_SCHEDCONF_MAX_LOGS, YarnConfiguration.DEFAULT_RM_SCHEDCONF_LEVELDB_MAX_LOGS); - this.compactionIntervalMsec = config.getLong( + long compactionIntervalMsec = config.getLong( YarnConfiguration.RM_SCHEDCONF_LEVELDB_COMPACTION_INTERVAL_SECS, YarnConfiguration .DEFAULT_RM_SCHEDCONF_LEVELDB_COMPACTION_INTERVAL_SECS) * 1000; - startCompactionTimer(); + dbManager.startCompactionTimer(compactionIntervalMsec, + this.getClass().getSimpleName()); } catch (Exception e) { throw new IOException(e); } @@ -114,7 +111,7 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore { confOptions.createIfMissing(false); File confVersionFile = new File(confVersion.toString()); - versionDb = initDatabaseHelper(confVersionFile, confOptions, + versionDb = versionDbManager.initDatabase(confVersionFile, confOptions, this::initVersionDb); Path storeRoot = createStorageDir(DB_NAME); @@ -154,7 +151,7 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore { }); LOG.info("Using conf database at {}", storeRoot); File dbFile = new File(storeRoot.toString()); - db = initDatabaseHelper(dbFile, options, this::initDb); + db = dbManager.initDatabase(dbFile, options, this::initDb); } private void initVersionDb(DB database) { @@ -170,30 +167,6 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore { increaseConfigVersion(); } - private DB initDatabaseHelper(File configurationFile, Options options, - Consumer initMethod) throws Exception { - DB database; - try { - database = JniDBFactory.factory.open(configurationFile, options); - } catch (NativeDB.DBException e) { - if (e.isNotFound() || e.getMessage().contains(" does not exist ")) { - LOG.info("Creating configuration version/database at {}", - configurationFile); - options.createIfMissing(true); - try { - database = JniDBFactory.factory.open(configurationFile, options); - initMethod.accept(database); - } catch (DBException dbErr) { - throw new IOException(dbErr.getMessage(), dbErr); - } - } else { - throw e; - } - } - - return database; - } - private Path createStorageDir(String storageName) throws IOException { Path root = getStorageDir(storageName); FileSystem fs = FileSystem.getLocal(conf); @@ -212,12 +185,8 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore { @Override public void close() throws IOException { - if (db != null) { - db.close(); - } - if (versionDb != null) { - versionDb.close(); - } + dbManager.close(); + versionDbManager.close(); } @Override @@ -313,28 +282,9 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore { return null; // unimplemented } - private void startCompactionTimer() { - if (compactionIntervalMsec > 0) { - Timer compactionTimer = new Timer( - this.getClass().getSimpleName() + " compaction timer", true); - compactionTimer.schedule(new CompactionTimerTask(), - compactionIntervalMsec, compactionIntervalMsec); - } - } - @Override public Version getConfStoreVersion() throws Exception { - Version version = null; - try { - byte[] data = db.get(bytes(VERSION_KEY)); - if (data != null) { - version = new VersionPBImpl(YarnServerCommonProtos.VersionProto - .parseFrom(data)); - } - } catch (DBException e) { - throw new IOException(e); - } - return version; + return dbManager.loadVersion(VERSION_KEY); } @VisibleForTesting @@ -350,37 +300,20 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore { @Override public void storeVersion() throws Exception { - storeVersion(CURRENT_VERSION_INFO); - } - - @VisibleForTesting - protected void storeVersion(Version version) throws Exception { - byte[] data = ((VersionPBImpl) version).getProto() - .toByteArray(); try { - db.put(bytes(VERSION_KEY), data); + storeVersion(CURRENT_VERSION_INFO); } catch (DBException e) { throw new IOException(e); } } + @VisibleForTesting + protected void storeVersion(Version version) { + dbManager.storeVersion(VERSION_KEY, version); + } + @Override public Version getCurrentVersion() { return CURRENT_VERSION_INFO; } - - private class CompactionTimerTask extends TimerTask { - @Override - public void run() { - long start = Time.monotonicNow(); - LOG.info("Starting full compaction cycle"); - try { - db.compactRange(null, null); - } catch (DBException e) { - LOG.error("Error compacting database", e); - } - long duration = Time.monotonicNow() - start; - LOG.info("Full compaction cycle completed in {} msec", duration); - } - } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java index 7a4ead48fc..e93599dd47 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java @@ -25,14 +25,17 @@ import static org.mockito.Mockito.verify; import java.io.File; import java.io.IOException; +import java.util.function.Consumer; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.records.Version; +import org.apache.hadoop.yarn.server.resourcemanager.DBManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.fusesource.leveldbjni.JniDBFactory; import org.iq80.leveldb.DB; +import org.iq80.leveldb.Options; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -131,19 +134,23 @@ public class TestLeveldbRMStateStore extends RMStateStoreTestBase { } @Test(timeout = 60000) - public void testCompactionCycle() throws Exception { + public void testCompactionCycle() { final DB mockdb = mock(DB.class); conf.setLong(YarnConfiguration.RM_LEVELDB_COMPACTION_INTERVAL_SECS, 1); - stateStore = new LeveldbRMStateStore() { + stateStore = new LeveldbRMStateStore(); + DBManager dbManager = new DBManager() { @Override - protected DB openDatabase() throws Exception { + public DB initDatabase(File configurationFile, Options options, + Consumer initMethod) { return mockdb; } }; + dbManager.setDb(mockdb); + stateStore.setDbManager(dbManager); stateStore.init(conf); stateStore.start(); verify(mockdb, timeout(10000)).compactRange( - (byte[]) isNull(), (byte[]) isNull()); + isNull(), isNull()); } @Test @@ -181,12 +188,12 @@ public class TestLeveldbRMStateStore extends RMStateStoreTestBase { } @Override - public void writeVersion(Version version) throws Exception { - stateStore.dbStoreVersion(version); + public void writeVersion(Version version) { + stateStore.storeVersion(version); } @Override - public Version getCurrentVersion() throws Exception { + public Version getCurrentVersion() { return stateStore.getCurrentVersion(); }