YARN-10189. Code cleanup in LeveldbRMStateStore. Contributed by Benjamin Teke

This commit is contained in:
Szilard Nemeth 2020-04-18 09:57:27 +02:00
parent ac40daece1
commit 76900b4f5b
4 changed files with 197 additions and 236 deletions

View File

@ -0,0 +1,131 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
import org.fusesource.leveldbjni.JniDBFactory;
import org.fusesource.leveldbjni.internal.NativeDB;
import org.iq80.leveldb.DB;
import org.iq80.leveldb.DBException;
import org.iq80.leveldb.Options;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.Timer;
import java.util.TimerTask;
import java.util.function.Consumer;
import static org.fusesource.leveldbjni.JniDBFactory.bytes;
public class DBManager implements Closeable {
public static final Logger LOG =
LoggerFactory.getLogger(DBManager.class);
private DB db;
private Timer compactionTimer;
public DB initDatabase(File configurationFile, Options options,
Consumer<DB> initMethod) throws Exception {
try {
db = JniDBFactory.factory.open(configurationFile, options);
} catch (NativeDB.DBException e) {
if (e.isNotFound() || e.getMessage().contains(" does not exist ")) {
LOG.info("Creating configuration version/database at {}",
configurationFile);
options.createIfMissing(true);
try {
db = JniDBFactory.factory.open(configurationFile, options);
initMethod.accept(db);
} catch (DBException dbErr) {
throw new IOException(dbErr.getMessage(), dbErr);
}
} else {
throw e;
}
}
return db;
}
public void close() throws IOException {
if (compactionTimer != null) {
compactionTimer.cancel();
compactionTimer = null;
}
if (db != null) {
db.close();
db = null;
}
}
public void storeVersion(String versionKey, Version versionValue) {
byte[] data = ((VersionPBImpl) versionValue).getProto().toByteArray();
db.put(bytes(versionKey), data);
}
public Version loadVersion(String versionKey) throws Exception {
Version version = null;
try {
byte[] data = db.get(bytes(versionKey));
if (data != null) {
version = new VersionPBImpl(YarnServerCommonProtos.VersionProto
.parseFrom(data));
}
} catch (DBException e) {
throw new IOException(e);
}
return version;
}
@VisibleForTesting
public void setDb(DB db) {
this.db = db;
}
public void startCompactionTimer(long compactionIntervalMsec,
String className) {
if (compactionIntervalMsec > 0) {
compactionTimer = new Timer(
className + " compaction timer", true);
compactionTimer.schedule(new CompactionTimerTask(),
compactionIntervalMsec, compactionIntervalMsec);
}
}
private class CompactionTimerTask extends TimerTask {
@Override
public void run() {
long start = Time.monotonicNow();
LOG.info("Starting full compaction cycle");
try {
db.compactRange(null, null);
} catch (DBException e) {
LOG.error("Error compacting database", e);
}
long duration = Time.monotonicNow() - start;
LOG.info("Full compaction cycle completed in " + duration + " msec");
}
}
}

View File

@ -31,9 +31,8 @@ import java.security.PrivateKey;
import java.security.cert.X509Certificate; import java.security.cert.X509Certificate;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Timer;
import java.util.TimerTask;
import org.apache.hadoop.yarn.server.resourcemanager.DBManager;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -42,13 +41,11 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.AMRMTokenSecretManagerStateProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.AMRMTokenSecretManagerStateProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.EpochProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.EpochProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationAttemptStateDataProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationAttemptStateDataProto;
@ -56,7 +53,6 @@ import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.Appl
import org.apache.hadoop.yarn.proto.YarnProtos.ReservationAllocationStateProto; import org.apache.hadoop.yarn.proto.YarnProtos.ReservationAllocationStateProto;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
@ -66,8 +62,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.AM
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
import org.apache.hadoop.yarn.server.utils.LeveldbIterator; import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
import org.fusesource.leveldbjni.JniDBFactory;
import org.fusesource.leveldbjni.internal.NativeDB;
import org.iq80.leveldb.DB; import org.iq80.leveldb.DB;
import org.iq80.leveldb.DBException; import org.iq80.leveldb.DBException;
import org.iq80.leveldb.Options; import org.iq80.leveldb.Options;
@ -100,7 +94,7 @@ public class LeveldbRMStateStore extends RMStateStore {
.newInstance(1, 1); .newInstance(1, 1);
private DB db; private DB db;
private Timer compactionTimer; private DBManager dbManager = new DBManager();
private long compactionIntervalMsec; private long compactionIntervalMsec;
private String getApplicationNodeKey(ApplicationId appId) { private String getApplicationNodeKey(ApplicationId appId) {
@ -140,7 +134,7 @@ public class LeveldbRMStateStore extends RMStateStore {
} }
@Override @Override
protected void initInternal(Configuration conf) throws Exception { protected void initInternal(Configuration conf) {
compactionIntervalMsec = conf.getLong( compactionIntervalMsec = conf.getLong(
YarnConfiguration.RM_LEVELDB_COMPACTION_INTERVAL_SECS, YarnConfiguration.RM_LEVELDB_COMPACTION_INTERVAL_SECS,
YarnConfiguration.DEFAULT_RM_LEVELDB_COMPACTION_INTERVAL_SECS) * 1000; YarnConfiguration.DEFAULT_RM_LEVELDB_COMPACTION_INTERVAL_SECS) * 1000;
@ -165,55 +159,20 @@ public class LeveldbRMStateStore extends RMStateStore {
@Override @Override
protected void startInternal() throws Exception { protected void startInternal() throws Exception {
db = openDatabase();
startCompactionTimer();
}
protected DB openDatabase() throws Exception {
Path storeRoot = createStorageDir(); Path storeRoot = createStorageDir();
Options options = new Options(); Options options = new Options();
options.createIfMissing(false); options.createIfMissing(false);
LOG.info("Using state database at " + storeRoot + " for recovery"); LOG.info("Using state database at " + storeRoot + " for recovery");
File dbfile = new File(storeRoot.toString()); File dbfile = new File(storeRoot.toString());
try { db = dbManager.initDatabase(dbfile, options, (database) ->
db = JniDBFactory.factory.open(dbfile, options); storeVersion(CURRENT_VERSION_INFO));
} catch (NativeDB.DBException e) { dbManager.startCompactionTimer(compactionIntervalMsec,
if (e.isNotFound() || e.getMessage().contains(" does not exist ")) { this.getClass().getSimpleName());
LOG.info("Creating state database at " + dbfile);
options.createIfMissing(true);
try {
db = JniDBFactory.factory.open(dbfile, options);
// store version
storeVersion();
} catch (DBException dbErr) {
throw new IOException(dbErr.getMessage(), dbErr);
}
} else {
throw e;
}
}
return db;
}
private void startCompactionTimer() {
if (compactionIntervalMsec > 0) {
compactionTimer = new Timer(
this.getClass().getSimpleName() + " compaction timer", true);
compactionTimer.schedule(new CompactionTimerTask(),
compactionIntervalMsec, compactionIntervalMsec);
}
} }
@Override @Override
protected void closeInternal() throws Exception { protected void closeInternal() throws Exception {
if (compactionTimer != null) { dbManager.close();
compactionTimer.cancel();
compactionTimer = null;
}
if (db != null) {
db.close();
db = null;
}
} }
@VisibleForTesting @VisibleForTesting
@ -228,33 +187,22 @@ public class LeveldbRMStateStore extends RMStateStore {
@Override @Override
protected Version loadVersion() throws Exception { protected Version loadVersion() throws Exception {
Version version = null; return dbManager.loadVersion(VERSION_NODE);
try {
byte[] data = db.get(bytes(VERSION_NODE));
if (data != null) {
version = new VersionPBImpl(VersionProto.parseFrom(data));
}
} catch (DBException e) {
throw new IOException(e);
}
return version;
} }
@Override @Override
protected void storeVersion() throws Exception { protected void storeVersion() throws Exception {
dbStoreVersion(CURRENT_VERSION_INFO);
}
void dbStoreVersion(Version state) throws IOException {
String key = VERSION_NODE;
byte[] data = ((VersionPBImpl) state).getProto().toByteArray();
try { try {
db.put(bytes(key), data); storeVersion(CURRENT_VERSION_INFO);
} catch (DBException e) { } catch (DBException e) {
throw new IOException(e); throw new IOException(e);
} }
} }
protected void storeVersion(Version version) {
dbManager.storeVersion(VERSION_NODE, version);
}
@Override @Override
protected Version getCurrentVersion() { protected Version getCurrentVersion() {
return CURRENT_VERSION_INFO; return CURRENT_VERSION_INFO;
@ -290,9 +238,7 @@ public class LeveldbRMStateStore extends RMStateStore {
private void loadReservationState(RMState rmState) throws IOException { private void loadReservationState(RMState rmState) throws IOException {
int numReservations = 0; int numReservations = 0;
LeveldbIterator iter = null; try (LeveldbIterator iter = new LeveldbIterator(db)) {
try {
iter = new LeveldbIterator(db);
iter.seek(bytes(RM_RESERVATION_KEY_PREFIX)); iter.seek(bytes(RM_RESERVATION_KEY_PREFIX));
while (iter.hasNext()) { while (iter.hasNext()) {
Entry<byte[],byte[]> entry = iter.next(); Entry<byte[],byte[]> entry = iter.next();
@ -324,10 +270,6 @@ public class LeveldbRMStateStore extends RMStateStore {
} }
} catch (DBException e) { } catch (DBException e) {
throw new IOException(e); throw new IOException(e);
} finally {
if (iter != null) {
iter.close();
}
} }
LOG.info("Recovered " + numReservations + " reservations"); LOG.info("Recovered " + numReservations + " reservations");
} }
@ -342,9 +284,7 @@ public class LeveldbRMStateStore extends RMStateStore {
private int loadRMDTSecretManagerKeys(RMState state) throws IOException { private int loadRMDTSecretManagerKeys(RMState state) throws IOException {
int numKeys = 0; int numKeys = 0;
LeveldbIterator iter = null; try (LeveldbIterator iter = new LeveldbIterator(db)) {
try {
iter = new LeveldbIterator(db);
iter.seek(bytes(RM_DT_MASTER_KEY_KEY_PREFIX)); iter.seek(bytes(RM_DT_MASTER_KEY_KEY_PREFIX));
while (iter.hasNext()) { while (iter.hasNext()) {
Entry<byte[],byte[]> entry = iter.next(); Entry<byte[],byte[]> entry = iter.next();
@ -361,10 +301,6 @@ public class LeveldbRMStateStore extends RMStateStore {
} }
} catch (DBException e) { } catch (DBException e) {
throw new IOException(e); throw new IOException(e);
} finally {
if (iter != null) {
iter.close();
}
} }
return numKeys; return numKeys;
} }
@ -382,9 +318,7 @@ public class LeveldbRMStateStore extends RMStateStore {
private int loadRMDTSecretManagerTokens(RMState state) throws IOException { private int loadRMDTSecretManagerTokens(RMState state) throws IOException {
int numTokens = 0; int numTokens = 0;
LeveldbIterator iter = null; try (LeveldbIterator iter = new LeveldbIterator(db)) {
try {
iter = new LeveldbIterator(db);
iter.seek(bytes(RM_DT_TOKEN_KEY_PREFIX)); iter.seek(bytes(RM_DT_TOKEN_KEY_PREFIX));
while (iter.hasNext()) { while (iter.hasNext()) {
Entry<byte[],byte[]> entry = iter.next(); Entry<byte[],byte[]> entry = iter.next();
@ -404,17 +338,13 @@ public class LeveldbRMStateStore extends RMStateStore {
} }
} catch (DBException e) { } catch (DBException e) {
throw new IOException(e); throw new IOException(e);
} finally {
if (iter != null) {
iter.close();
}
} }
return numTokens; return numTokens;
} }
private RMDelegationTokenIdentifierData loadDelegationToken(byte[] data) private RMDelegationTokenIdentifierData loadDelegationToken(byte[] data)
throws IOException { throws IOException {
RMDelegationTokenIdentifierData tokenData = null; RMDelegationTokenIdentifierData tokenData;
DataInputStream in = new DataInputStream(new ByteArrayInputStream(data)); DataInputStream in = new DataInputStream(new ByteArrayInputStream(data));
try { try {
tokenData = RMStateStoreUtils.readRMDelegationTokenIdentifierData(in); tokenData = RMStateStoreUtils.readRMDelegationTokenIdentifierData(in);
@ -426,7 +356,7 @@ public class LeveldbRMStateStore extends RMStateStore {
private void loadRMDTSecretManagerTokenSequenceNumber(RMState state) private void loadRMDTSecretManagerTokenSequenceNumber(RMState state)
throws IOException { throws IOException {
byte[] data = null; byte[] data;
try { try {
data = db.get(bytes(RM_DT_SEQUENCE_NUMBER_KEY)); data = db.get(bytes(RM_DT_SEQUENCE_NUMBER_KEY));
} catch (DBException e) { } catch (DBException e) {
@ -445,9 +375,7 @@ public class LeveldbRMStateStore extends RMStateStore {
private void loadRMApps(RMState state) throws IOException { private void loadRMApps(RMState state) throws IOException {
int numApps = 0; int numApps = 0;
int numAppAttempts = 0; int numAppAttempts = 0;
LeveldbIterator iter = null; try (LeveldbIterator iter = new LeveldbIterator(db)) {
try {
iter = new LeveldbIterator(db);
iter.seek(bytes(RM_APP_KEY_PREFIX)); iter.seek(bytes(RM_APP_KEY_PREFIX));
while (iter.hasNext()) { while (iter.hasNext()) {
Entry<byte[],byte[]> entry = iter.next(); Entry<byte[],byte[]> entry = iter.next();
@ -467,10 +395,6 @@ public class LeveldbRMStateStore extends RMStateStore {
} }
} catch (DBException e) { } catch (DBException e) {
throw new IOException(e); throw new IOException(e);
} finally {
if (iter != null) {
iter.close();
}
} }
LOG.info("Recovered " + numApps + " applications and " + numAppAttempts LOG.info("Recovered " + numApps + " applications and " + numAppAttempts
+ " application attempts"); + " application attempts");
@ -523,7 +447,7 @@ public class LeveldbRMStateStore extends RMStateStore {
@VisibleForTesting @VisibleForTesting
ApplicationStateData loadRMAppState(ApplicationId appId) throws IOException { ApplicationStateData loadRMAppState(ApplicationId appId) throws IOException {
String appKey = getApplicationNodeKey(appId); String appKey = getApplicationNodeKey(appId);
byte[] data = null; byte[] data;
try { try {
data = db.get(bytes(appKey)); data = db.get(bytes(appKey));
} catch (DBException e) { } catch (DBException e) {
@ -539,7 +463,7 @@ public class LeveldbRMStateStore extends RMStateStore {
ApplicationAttemptStateData loadRMAppAttemptState( ApplicationAttemptStateData loadRMAppAttemptState(
ApplicationAttemptId attemptId) throws IOException { ApplicationAttemptId attemptId) throws IOException {
String attemptKey = getApplicationAttemptNodeKey(attemptId); String attemptKey = getApplicationAttemptNodeKey(attemptId);
byte[] data = null; byte[] data;
try { try {
data = db.get(bytes(attemptKey)); data = db.get(bytes(attemptKey));
} catch (DBException e) { } catch (DBException e) {
@ -668,8 +592,7 @@ public class LeveldbRMStateStore extends RMStateStore {
appState.getApplicationSubmissionContext().getApplicationId(); appState.getApplicationSubmissionContext().getApplicationId();
String appKey = getApplicationNodeKey(appId); String appKey = getApplicationNodeKey(appId);
try { try {
WriteBatch batch = db.createWriteBatch(); try (WriteBatch batch = db.createWriteBatch()) {
try {
batch.delete(bytes(appKey)); batch.delete(bytes(appKey));
for (ApplicationAttemptId attemptId : appState.attempts.keySet()) { for (ApplicationAttemptId attemptId : appState.attempts.keySet()) {
String attemptKey = getApplicationAttemptNodeKey(appKey, attemptId); String attemptKey = getApplicationAttemptNodeKey(appKey, attemptId);
@ -680,8 +603,6 @@ public class LeveldbRMStateStore extends RMStateStore {
+ appState.attempts.size() + " attempts" + " at " + appKey); + appState.attempts.size() + " attempts" + " at " + appKey);
} }
db.write(batch); db.write(batch);
} finally {
batch.close();
} }
} catch (DBException e) { } catch (DBException e) {
throw new IOException(e); throw new IOException(e);
@ -693,16 +614,13 @@ public class LeveldbRMStateStore extends RMStateStore {
ReservationAllocationStateProto reservationAllocation, String planName, ReservationAllocationStateProto reservationAllocation, String planName,
String reservationIdName) throws Exception { String reservationIdName) throws Exception {
try { try {
WriteBatch batch = db.createWriteBatch(); try (WriteBatch batch = db.createWriteBatch()) {
try {
String key = getReservationNodeKey(planName, reservationIdName); String key = getReservationNodeKey(planName, reservationIdName);
LOG.debug("Storing state for reservation {} plan {} at {}", LOG.debug("Storing state for reservation {} plan {} at {}",
reservationIdName, planName, key); reservationIdName, planName, key);
batch.put(bytes(key), reservationAllocation.toByteArray()); batch.put(bytes(key), reservationAllocation.toByteArray());
db.write(batch); db.write(batch);
} finally {
batch.close();
} }
} catch (DBException e) { } catch (DBException e) {
throw new IOException(e); throw new IOException(e);
@ -713,16 +631,13 @@ public class LeveldbRMStateStore extends RMStateStore {
protected void removeReservationState(String planName, protected void removeReservationState(String planName,
String reservationIdName) throws Exception { String reservationIdName) throws Exception {
try { try {
WriteBatch batch = db.createWriteBatch(); try (WriteBatch batch = db.createWriteBatch()) {
try {
String reservationKey = String reservationKey =
getReservationNodeKey(planName, reservationIdName); getReservationNodeKey(planName, reservationIdName);
batch.delete(bytes(reservationKey)); batch.delete(bytes(reservationKey));
LOG.debug("Removing state for reservation {} plan {} at {}", LOG.debug("Removing state for reservation {} plan {} at {}",
reservationIdName, planName, reservationKey); reservationIdName, planName, reservationKey);
db.write(batch); db.write(batch);
} finally {
batch.close();
} }
} catch (DBException e) { } catch (DBException e) {
throw new IOException(e); throw new IOException(e);
@ -736,10 +651,9 @@ public class LeveldbRMStateStore extends RMStateStore {
new RMDelegationTokenIdentifierData(tokenId, renewDate); new RMDelegationTokenIdentifierData(tokenId, renewDate);
LOG.debug("Storing token to {}", tokenKey); LOG.debug("Storing token to {}", tokenKey);
try { try {
WriteBatch batch = db.createWriteBatch(); try (WriteBatch batch = db.createWriteBatch()) {
try {
batch.put(bytes(tokenKey), tokenData.toByteArray()); batch.put(bytes(tokenKey), tokenData.toByteArray());
if(!isUpdate) { if (!isUpdate) {
ByteArrayOutputStream bs = new ByteArrayOutputStream(); ByteArrayOutputStream bs = new ByteArrayOutputStream();
try (DataOutputStream ds = new DataOutputStream(bs)) { try (DataOutputStream ds = new DataOutputStream(bs)) {
ds.writeInt(tokenId.getSequenceNumber()); ds.writeInt(tokenId.getSequenceNumber());
@ -749,8 +663,6 @@ public class LeveldbRMStateStore extends RMStateStore {
batch.put(bytes(RM_DT_SEQUENCE_NUMBER_KEY), bs.toByteArray()); batch.put(bytes(RM_DT_SEQUENCE_NUMBER_KEY), bs.toByteArray());
} }
db.write(batch); db.write(batch);
} finally {
batch.close();
} }
} catch (DBException e) { } catch (DBException e) {
throw new IOException(e); throw new IOException(e);
@ -789,11 +701,8 @@ public class LeveldbRMStateStore extends RMStateStore {
String dbKey = getRMDTMasterKeyNodeKey(masterKey); String dbKey = getRMDTMasterKeyNodeKey(masterKey);
LOG.debug("Storing token master key to {}", dbKey); LOG.debug("Storing token master key to {}", dbKey);
ByteArrayOutputStream os = new ByteArrayOutputStream(); ByteArrayOutputStream os = new ByteArrayOutputStream();
DataOutputStream out = new DataOutputStream(os); try (DataOutputStream out = new DataOutputStream(os)) {
try {
masterKey.write(out); masterKey.write(out);
} finally {
out.close();
} }
try { try {
db.put(bytes(dbKey), os.toByteArray()); db.put(bytes(dbKey), os.toByteArray());
@ -833,13 +742,10 @@ public class LeveldbRMStateStore extends RMStateStore {
String caPrivateKeyKey = getProxyCAPrivateKeyNodeKey(); String caPrivateKeyKey = getProxyCAPrivateKeyNodeKey();
try { try {
WriteBatch batch = db.createWriteBatch(); try (WriteBatch batch = db.createWriteBatch()) {
try {
batch.put(bytes(caCertKey), caCertData); batch.put(bytes(caCertKey), caCertData);
batch.put(bytes(caPrivateKeyKey), caPrivateKeyData); batch.put(bytes(caPrivateKeyKey), caPrivateKeyData);
db.write(batch); db.write(batch);
} finally {
batch.close();
} }
} catch (DBException e) { } catch (DBException e) {
throw new IOException(e); throw new IOException(e);
@ -871,9 +777,7 @@ public class LeveldbRMStateStore extends RMStateStore {
@VisibleForTesting @VisibleForTesting
int getNumEntriesInDatabase() throws IOException { int getNumEntriesInDatabase() throws IOException {
int numEntries = 0; int numEntries = 0;
LeveldbIterator iter = null; try (LeveldbIterator iter = new LeveldbIterator(db)) {
try {
iter = new LeveldbIterator(db);
iter.seekToFirst(); iter.seekToFirst();
while (iter.hasNext()) { while (iter.hasNext()) {
Entry<byte[], byte[]> entry = iter.next(); Entry<byte[], byte[]> entry = iter.next();
@ -882,26 +786,12 @@ public class LeveldbRMStateStore extends RMStateStore {
} }
} catch (DBException e) { } catch (DBException e) {
throw new IOException(e); throw new IOException(e);
} finally {
if (iter != null) {
iter.close();
}
} }
return numEntries; return numEntries;
} }
private class CompactionTimerTask extends TimerTask { @VisibleForTesting
@Override protected void setDbManager(DBManager dbManager) {
public void run() { this.dbManager = dbManager;
long start = Time.monotonicNow();
LOG.info("Starting full compaction cycle");
try {
db.compactRange(null, null);
} catch (DBException e) {
LOG.error("Error compacting database", e);
}
long duration = Time.monotonicNow() - start;
LOG.info("Full compaction cycle completed in " + duration + " msec");
}
} }
} }

View File

@ -19,20 +19,16 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.yarn.server.resourcemanager.DBManager;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos;
import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.fusesource.leveldbjni.JniDBFactory;
import org.fusesource.leveldbjni.internal.NativeDB;
import org.iq80.leveldb.DB; import org.iq80.leveldb.DB;
import org.iq80.leveldb.DBComparator; import org.iq80.leveldb.DBComparator;
import org.iq80.leveldb.DBException; import org.iq80.leveldb.DBException;
@ -52,9 +48,6 @@ import java.nio.charset.StandardCharsets;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.function.Consumer;
import static org.fusesource.leveldbjni.JniDBFactory.bytes; import static org.fusesource.leveldbjni.JniDBFactory.bytes;
@ -72,6 +65,8 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
private static final String CONF_VERSION_NAME = "conf-version-store"; private static final String CONF_VERSION_NAME = "conf-version-store";
private static final String CONF_VERSION_KEY = "conf-version"; private static final String CONF_VERSION_KEY = "conf-version";
private DB db; private DB db;
private DBManager dbManager;
private DBManager versionDbManager;
private DB versionDb; private DB versionDb;
private long maxLogs; private long maxLogs;
private Configuration conf; private Configuration conf;
@ -79,23 +74,25 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
@VisibleForTesting @VisibleForTesting
protected static final Version CURRENT_VERSION_INFO = Version protected static final Version CURRENT_VERSION_INFO = Version
.newInstance(0, 1); .newInstance(0, 1);
private long compactionIntervalMsec;
@Override @Override
public void initialize(Configuration config, Configuration schedConf, public void initialize(Configuration config, Configuration schedConf,
RMContext rmContext) throws IOException { RMContext rmContext) throws IOException {
this.conf = config; this.conf = config;
this.initSchedConf = schedConf; this.initSchedConf = schedConf;
this.dbManager = new DBManager();
this.versionDbManager = new DBManager();
try { try {
initDatabase(); initDatabase();
this.maxLogs = config.getLong( this.maxLogs = config.getLong(
YarnConfiguration.RM_SCHEDCONF_MAX_LOGS, YarnConfiguration.RM_SCHEDCONF_MAX_LOGS,
YarnConfiguration.DEFAULT_RM_SCHEDCONF_LEVELDB_MAX_LOGS); YarnConfiguration.DEFAULT_RM_SCHEDCONF_LEVELDB_MAX_LOGS);
this.compactionIntervalMsec = config.getLong( long compactionIntervalMsec = config.getLong(
YarnConfiguration.RM_SCHEDCONF_LEVELDB_COMPACTION_INTERVAL_SECS, YarnConfiguration.RM_SCHEDCONF_LEVELDB_COMPACTION_INTERVAL_SECS,
YarnConfiguration YarnConfiguration
.DEFAULT_RM_SCHEDCONF_LEVELDB_COMPACTION_INTERVAL_SECS) * 1000; .DEFAULT_RM_SCHEDCONF_LEVELDB_COMPACTION_INTERVAL_SECS) * 1000;
startCompactionTimer(); dbManager.startCompactionTimer(compactionIntervalMsec,
this.getClass().getSimpleName());
} catch (Exception e) { } catch (Exception e) {
throw new IOException(e); throw new IOException(e);
} }
@ -114,7 +111,7 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
confOptions.createIfMissing(false); confOptions.createIfMissing(false);
File confVersionFile = new File(confVersion.toString()); File confVersionFile = new File(confVersion.toString());
versionDb = initDatabaseHelper(confVersionFile, confOptions, versionDb = versionDbManager.initDatabase(confVersionFile, confOptions,
this::initVersionDb); this::initVersionDb);
Path storeRoot = createStorageDir(DB_NAME); Path storeRoot = createStorageDir(DB_NAME);
@ -154,7 +151,7 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
}); });
LOG.info("Using conf database at {}", storeRoot); LOG.info("Using conf database at {}", storeRoot);
File dbFile = new File(storeRoot.toString()); File dbFile = new File(storeRoot.toString());
db = initDatabaseHelper(dbFile, options, this::initDb); db = dbManager.initDatabase(dbFile, options, this::initDb);
} }
private void initVersionDb(DB database) { private void initVersionDb(DB database) {
@ -170,30 +167,6 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
increaseConfigVersion(); increaseConfigVersion();
} }
private DB initDatabaseHelper(File configurationFile, Options options,
Consumer<DB> initMethod) throws Exception {
DB database;
try {
database = JniDBFactory.factory.open(configurationFile, options);
} catch (NativeDB.DBException e) {
if (e.isNotFound() || e.getMessage().contains(" does not exist ")) {
LOG.info("Creating configuration version/database at {}",
configurationFile);
options.createIfMissing(true);
try {
database = JniDBFactory.factory.open(configurationFile, options);
initMethod.accept(database);
} catch (DBException dbErr) {
throw new IOException(dbErr.getMessage(), dbErr);
}
} else {
throw e;
}
}
return database;
}
private Path createStorageDir(String storageName) throws IOException { private Path createStorageDir(String storageName) throws IOException {
Path root = getStorageDir(storageName); Path root = getStorageDir(storageName);
FileSystem fs = FileSystem.getLocal(conf); FileSystem fs = FileSystem.getLocal(conf);
@ -212,12 +185,8 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
@Override @Override
public void close() throws IOException { public void close() throws IOException {
if (db != null) { dbManager.close();
db.close(); versionDbManager.close();
}
if (versionDb != null) {
versionDb.close();
}
} }
@Override @Override
@ -313,28 +282,9 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
return null; // unimplemented return null; // unimplemented
} }
private void startCompactionTimer() {
if (compactionIntervalMsec > 0) {
Timer compactionTimer = new Timer(
this.getClass().getSimpleName() + " compaction timer", true);
compactionTimer.schedule(new CompactionTimerTask(),
compactionIntervalMsec, compactionIntervalMsec);
}
}
@Override @Override
public Version getConfStoreVersion() throws Exception { public Version getConfStoreVersion() throws Exception {
Version version = null; return dbManager.loadVersion(VERSION_KEY);
try {
byte[] data = db.get(bytes(VERSION_KEY));
if (data != null) {
version = new VersionPBImpl(YarnServerCommonProtos.VersionProto
.parseFrom(data));
}
} catch (DBException e) {
throw new IOException(e);
}
return version;
} }
@VisibleForTesting @VisibleForTesting
@ -350,37 +300,20 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
@Override @Override
public void storeVersion() throws Exception { public void storeVersion() throws Exception {
storeVersion(CURRENT_VERSION_INFO);
}
@VisibleForTesting
protected void storeVersion(Version version) throws Exception {
byte[] data = ((VersionPBImpl) version).getProto()
.toByteArray();
try { try {
db.put(bytes(VERSION_KEY), data); storeVersion(CURRENT_VERSION_INFO);
} catch (DBException e) { } catch (DBException e) {
throw new IOException(e); throw new IOException(e);
} }
} }
@VisibleForTesting
protected void storeVersion(Version version) {
dbManager.storeVersion(VERSION_KEY, version);
}
@Override @Override
public Version getCurrentVersion() { public Version getCurrentVersion() {
return CURRENT_VERSION_INFO; return CURRENT_VERSION_INFO;
} }
private class CompactionTimerTask extends TimerTask {
@Override
public void run() {
long start = Time.monotonicNow();
LOG.info("Starting full compaction cycle");
try {
db.compactRange(null, null);
} catch (DBException e) {
LOG.error("Error compacting database", e);
}
long duration = Time.monotonicNow() - start;
LOG.info("Full compaction cycle completed in {} msec", duration);
}
}
} }

View File

@ -25,14 +25,17 @@ import static org.mockito.Mockito.verify;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.function.Consumer;
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.resourcemanager.DBManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.fusesource.leveldbjni.JniDBFactory; import org.fusesource.leveldbjni.JniDBFactory;
import org.iq80.leveldb.DB; import org.iq80.leveldb.DB;
import org.iq80.leveldb.Options;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -131,19 +134,23 @@ public class TestLeveldbRMStateStore extends RMStateStoreTestBase {
} }
@Test(timeout = 60000) @Test(timeout = 60000)
public void testCompactionCycle() throws Exception { public void testCompactionCycle() {
final DB mockdb = mock(DB.class); final DB mockdb = mock(DB.class);
conf.setLong(YarnConfiguration.RM_LEVELDB_COMPACTION_INTERVAL_SECS, 1); conf.setLong(YarnConfiguration.RM_LEVELDB_COMPACTION_INTERVAL_SECS, 1);
stateStore = new LeveldbRMStateStore() { stateStore = new LeveldbRMStateStore();
DBManager dbManager = new DBManager() {
@Override @Override
protected DB openDatabase() throws Exception { public DB initDatabase(File configurationFile, Options options,
Consumer<DB> initMethod) {
return mockdb; return mockdb;
} }
}; };
dbManager.setDb(mockdb);
stateStore.setDbManager(dbManager);
stateStore.init(conf); stateStore.init(conf);
stateStore.start(); stateStore.start();
verify(mockdb, timeout(10000)).compactRange( verify(mockdb, timeout(10000)).compactRange(
(byte[]) isNull(), (byte[]) isNull()); isNull(), isNull());
} }
@Test @Test
@ -181,12 +188,12 @@ public class TestLeveldbRMStateStore extends RMStateStoreTestBase {
} }
@Override @Override
public void writeVersion(Version version) throws Exception { public void writeVersion(Version version) {
stateStore.dbStoreVersion(version); stateStore.storeVersion(version);
} }
@Override @Override
public Version getCurrentVersion() throws Exception { public Version getCurrentVersion() {
return stateStore.getCurrentVersion(); return stateStore.getCurrentVersion();
} }