YARN-5947: Create LeveldbConfigurationStore class using Leveldb as backing store. Contributed by Jonathan Hung

This commit is contained in:
Xuan 2017-07-31 16:48:40 -07:00 committed by Jonathan Hung
parent 04f9e80bb2
commit 74ba6ffa0b
8 changed files with 414 additions and 6 deletions

View File

@ -677,8 +677,21 @@ public static boolean isAclEnabled(Configuration conf) {
public static final String SCHEDULER_CONFIGURATION_STORE_CLASS =
YARN_PREFIX + "scheduler.configuration.store.class";
public static final String MEMORY_CONFIGURATION_STORE = "memory";
public static final String LEVELDB_CONFIGURATION_STORE = "leveldb";
public static final String DEFAULT_CONFIGURATION_STORE =
MEMORY_CONFIGURATION_STORE;
public static final String RM_SCHEDCONF_STORE_PATH = YARN_PREFIX
+ "scheduler.configuration.leveldb-store.path";
public static final String RM_SCHEDCONF_LEVELDB_COMPACTION_INTERVAL_SECS =
YARN_PREFIX
+ "scheduler.configuration.leveldb-store.compaction-interval-secs";
public static final long
DEFAULT_RM_SCHEDCONF_LEVELDB_COMPACTION_INTERVAL_SECS = 60 * 60 * 24L;
public static final String RM_SCHEDCONF_LEVELDB_MAX_LOGS =
YARN_PREFIX + "scheduler.configuration.leveldb-store.max-logs";
public static final int DEFAULT_RM_SCHEDCONF_LEVELDB_MAX_LOGS = 1000;
public static final String RM_SCHEDULER_MUTATION_ACL_POLICY_CLASS =
YARN_PREFIX + "scheduler.configuration.mutation.acl-policy.class";

View File

@ -3390,4 +3390,33 @@
<value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.DefaultConfigurationMutationACLPolicy</value>
</property>
<property>
<description>
The storage path for LevelDB implementation of configuration store,
when yarn.scheduler.configuration.store.class is configured to be
"leveldb".
</description>
<name>yarn.scheduler.configuration.leveldb-store.path</name>
<value>${hadoop.tmp.dir}/yarn/system/confstore</value>
</property>
<property>
<description>
The compaction interval for LevelDB configuration store in secs,
when yarn.scheduler.configuration.store.class is configured to be
"leveldb". Default is one day.
</description>
<name>yarn.scheduler.configuration.leveldb-store.compaction-interval-secs</name>
<value>86400</value>
</property>
<property>
<description>
The max number of configuration change log entries kept in LevelDB config
store, when yarn.scheduler.configuration.store.class is configured to be
"leveldb". Default is 1000.
</description>
<name>yarn.scheduler.configuration.leveldb-store.max-logs</name>
<value>1000</value>
</property>
</configuration>

View File

@ -28,6 +28,12 @@
*/
public interface MutableConfigurationProvider {
/**
* Apply transactions which were not committed.
* @throws IOException if recovery fails
*/
void recoverConf() throws IOException;
/**
* Update the scheduler configuration with the provided key value pairs.
* @param user User issuing the request

View File

@ -393,6 +393,9 @@ public void serviceInit(Configuration conf) throws Exception {
@Override
public void serviceStart() throws Exception {
startSchedulerThreads();
if (this.csConfProvider instanceof MutableConfigurationProvider) {
((MutableConfigurationProvider) csConfProvider).recoverConf();
}
super.serviceStart();
}

View File

@ -0,0 +1,314 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.fusesource.leveldbjni.JniDBFactory;
import org.fusesource.leveldbjni.internal.NativeDB;
import org.iq80.leveldb.DB;
import org.iq80.leveldb.DBComparator;
import org.iq80.leveldb.DBException;
import org.iq80.leveldb.DBIterator;
import org.iq80.leveldb.Options;
import org.iq80.leveldb.WriteBatch;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectInputStream;
import java.io.ObjectOutput;
import java.io.ObjectOutputStream;
import java.nio.charset.StandardCharsets;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import static org.fusesource.leveldbjni.JniDBFactory.bytes;
/**
* A LevelDB implementation of {@link YarnConfigurationStore}.
*/
public class LeveldbConfigurationStore implements YarnConfigurationStore {
public static final Log LOG =
LogFactory.getLog(LeveldbConfigurationStore.class);
private static final String DB_NAME = "yarn-conf-store";
private static final String LOG_PREFIX = "log.";
private static final String LOG_COMMITTED_TXN = "committedTxn";
private DB db;
private long txnId = 0;
private long minTxn = 0;
private long maxLogs;
private Configuration conf;
private LinkedList<LogMutation> pendingMutations = new LinkedList<>();
private Timer compactionTimer;
private long compactionIntervalMsec;
@Override
public void initialize(Configuration config, Configuration schedConf)
throws IOException {
this.conf = config;
try {
this.db = initDatabase(schedConf);
this.txnId = Long.parseLong(new String(db.get(bytes(LOG_COMMITTED_TXN)),
StandardCharsets.UTF_8));
DBIterator itr = db.iterator();
itr.seek(bytes(LOG_PREFIX + txnId));
// Seek to first uncommitted log
itr.next();
while (itr.hasNext()) {
Map.Entry<byte[], byte[]> entry = itr.next();
if (!new String(entry.getKey(), StandardCharsets.UTF_8)
.startsWith(LOG_PREFIX)) {
break;
}
pendingMutations.add(deserLogMutation(entry.getValue()));
}
// Get the earliest txnId stored in logs
itr.seekToFirst();
if (itr.hasNext()) {
Map.Entry<byte[], byte[]> entry = itr.next();
byte[] key = entry.getKey();
String logId = new String(key, StandardCharsets.UTF_8);
if (logId.startsWith(LOG_PREFIX)) {
minTxn = Long.parseLong(logId.substring(logId.indexOf('.') + 1));
}
}
this.maxLogs = config.getLong(
YarnConfiguration.RM_SCHEDCONF_LEVELDB_MAX_LOGS,
YarnConfiguration.DEFAULT_RM_SCHEDCONF_LEVELDB_MAX_LOGS);
this.compactionIntervalMsec = config.getLong(
YarnConfiguration.RM_SCHEDCONF_LEVELDB_COMPACTION_INTERVAL_SECS,
YarnConfiguration
.DEFAULT_RM_SCHEDCONF_LEVELDB_COMPACTION_INTERVAL_SECS) * 1000;
startCompactionTimer();
} catch (Exception e) {
throw new IOException(e);
}
}
private DB initDatabase(Configuration config) throws Exception {
Path storeRoot = createStorageDir();
Options options = new Options();
options.createIfMissing(false);
options.comparator(new DBComparator() {
@Override
public int compare(byte[] key1, byte[] key2) {
String key1Str = new String(key1, StandardCharsets.UTF_8);
String key2Str = new String(key2, StandardCharsets.UTF_8);
int key1Txn = Integer.MAX_VALUE;
int key2Txn = Integer.MAX_VALUE;
if (key1Str.startsWith(LOG_PREFIX)) {
key1Txn = Integer.parseInt(key1Str.substring(
key1Str.indexOf('.') + 1));
}
if (key2Str.startsWith(LOG_PREFIX)) {
key2Txn = Integer.parseInt(key2Str.substring(
key2Str.indexOf('.') + 1));
}
// TODO txnId could overflow, in theory
if (key1Txn == Integer.MAX_VALUE && key2Txn == Integer.MAX_VALUE) {
if (key1Str.equals(key2Str) && key1Str.equals(LOG_COMMITTED_TXN)) {
return 0;
} else if (key1Str.equals(LOG_COMMITTED_TXN)) {
return -1;
} else if (key2Str.equals(LOG_COMMITTED_TXN)) {
return 1;
}
return key1Str.compareTo(key2Str);
}
return key1Txn - key2Txn;
}
@Override
public String name() {
return "logComparator";
}
public byte[] findShortestSeparator(byte[] start, byte[] limit) {
return start;
}
public byte[] findShortSuccessor(byte[] key) {
return key;
}
});
LOG.info("Using conf database at " + storeRoot);
File dbfile = new File(storeRoot.toString());
try {
db = JniDBFactory.factory.open(dbfile, options);
} catch (NativeDB.DBException e) {
if (e.isNotFound() || e.getMessage().contains(" does not exist ")) {
LOG.info("Creating conf database at " + dbfile);
options.createIfMissing(true);
try {
db = JniDBFactory.factory.open(dbfile, options);
// Write the initial scheduler configuration
WriteBatch initBatch = db.createWriteBatch();
for (Map.Entry<String, String> kv : config) {
initBatch.put(bytes(kv.getKey()), bytes(kv.getValue()));
}
initBatch.put(bytes(LOG_COMMITTED_TXN), bytes("0"));
db.write(initBatch);
} catch (DBException dbErr) {
throw new IOException(dbErr.getMessage(), dbErr);
}
} else {
throw e;
}
}
return db;
}
private Path createStorageDir() throws IOException {
Path root = getStorageDir();
FileSystem fs = FileSystem.getLocal(conf);
fs.mkdirs(root, new FsPermission((short) 0700));
return root;
}
private Path getStorageDir() throws IOException {
String storePath = conf.get(YarnConfiguration.RM_SCHEDCONF_STORE_PATH);
if (storePath == null) {
throw new IOException("No store location directory configured in " +
YarnConfiguration.RM_SCHEDCONF_STORE_PATH);
}
return new Path(storePath, DB_NAME);
}
@Override
public synchronized long logMutation(LogMutation logMutation)
throws IOException {
logMutation.setId(++txnId);
WriteBatch logBatch = db.createWriteBatch();
logBatch.put(bytes(LOG_PREFIX + txnId), serLogMutation(logMutation));
if (txnId - minTxn >= maxLogs) {
logBatch.delete(bytes(LOG_PREFIX + minTxn));
minTxn++;
}
db.write(logBatch);
pendingMutations.add(logMutation);
return txnId;
}
@Override
public synchronized boolean confirmMutation(long id, boolean isValid)
throws IOException {
WriteBatch updateBatch = db.createWriteBatch();
if (isValid) {
LogMutation mutation = deserLogMutation(db.get(bytes(LOG_PREFIX + id)));
for (Map.Entry<String, String> changes :
mutation.getUpdates().entrySet()) {
if (changes.getValue() == null || changes.getValue().isEmpty()) {
updateBatch.delete(bytes(changes.getKey()));
} else {
updateBatch.put(bytes(changes.getKey()), bytes(changes.getValue()));
}
}
}
updateBatch.put(bytes(LOG_COMMITTED_TXN), bytes(String.valueOf(id)));
db.write(updateBatch);
// Assumes logMutation and confirmMutation are done in the same
// synchronized method. For example,
// {@link MutableCSConfigurationProvider#mutateConfiguration(
// UserGroupInformation user, SchedConfUpdateInfo confUpdate)}
pendingMutations.removeFirst();
return true;
}
private byte[] serLogMutation(LogMutation mutation) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (ObjectOutput oos = new ObjectOutputStream(baos)) {
oos.writeObject(mutation);
oos.flush();
return baos.toByteArray();
}
}
private LogMutation deserLogMutation(byte[] mutation) throws IOException {
try (ObjectInput input = new ObjectInputStream(
new ByteArrayInputStream(mutation))) {
return (LogMutation) input.readObject();
} catch (ClassNotFoundException e) {
throw new IOException(e);
}
}
@Override
public synchronized Configuration retrieve() {
DBIterator itr = db.iterator();
itr.seek(bytes(LOG_COMMITTED_TXN));
Configuration config = new Configuration(false);
itr.next();
while (itr.hasNext()) {
Map.Entry<byte[], byte[]> entry = itr.next();
config.set(new String(entry.getKey(), StandardCharsets.UTF_8),
new String(entry.getValue(), StandardCharsets.UTF_8));
}
return config;
}
@Override
public List<LogMutation> getPendingMutations() {
return pendingMutations;
}
@Override
public List<LogMutation> getConfirmedConfHistory(long fromId) {
return null; // unimplemented
}
// TODO below was taken from LeveldbRMStateStore, it can probably be
// refactored
private void startCompactionTimer() {
if (compactionIntervalMsec > 0) {
compactionTimer = new Timer(
this.getClass().getSimpleName() + " compaction timer", true);
compactionTimer.schedule(new CompactionTimerTask(),
compactionIntervalMsec, compactionIntervalMsec);
}
}
private class CompactionTimerTask extends TimerTask {
@Override
public void run() {
long start = Time.monotonicNow();
LOG.info("Starting full compaction cycle");
try {
db.compactRange(null, null);
} catch (DBException e) {
LOG.error("Error compacting database", e);
}
long duration = Time.monotonicNow() - start;
LOG.info("Full compaction cycle completed in " + duration + " msec");
}
}
}

View File

@ -19,6 +19,8 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
import com.google.common.base.Joiner;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
@ -49,6 +51,9 @@
public class MutableCSConfigurationProvider implements CSConfigurationProvider,
MutableConfigurationProvider {
public static final Log LOG =
LogFactory.getLog(MutableCSConfigurationProvider.class);
private Configuration schedConf;
private YarnConfigurationStore confStore;
private ConfigurationMutationACLPolicy aclMutationPolicy;
@ -68,6 +73,9 @@ public void init(Configuration config) throws IOException {
case YarnConfiguration.MEMORY_CONFIGURATION_STORE:
this.confStore = new InMemoryConfigurationStore();
break;
case YarnConfiguration.LEVELDB_CONFIGURATION_STORE:
this.confStore = new LeveldbConfigurationStore();
break;
default:
this.confStore = YarnConfigurationStoreFactory.getStore(config);
break;
@ -82,6 +90,9 @@ public void init(Configuration config) throws IOException {
schedConf.set(kv.getKey(), kv.getValue());
}
confStore.initialize(config, schedConf);
// After initializing confStore, the store may already have an existing
// configuration. Use this one.
schedConf = confStore.retrieve();
this.aclMutationPolicy = ConfigurationMutationACLPolicyFactory
.getPolicy(config);
aclMutationPolicy.init(config, rmContext);
@ -97,7 +108,7 @@ public CapacitySchedulerConfiguration loadConfiguration(Configuration
}
@Override
public void mutateConfiguration(UserGroupInformation user,
public synchronized void mutateConfiguration(UserGroupInformation user,
SchedConfUpdateInfo confUpdate) throws IOException {
if (!aclMutationPolicy.isMutationAllowed(user, confUpdate)) {
throw new AccessControlException("User is not admin of all modified" +
@ -124,6 +135,31 @@ public void mutateConfiguration(UserGroupInformation user,
confStore.confirmMutation(id, true);
}
@Override
public void recoverConf() throws IOException {
List<LogMutation> uncommittedLogs = confStore.getPendingMutations();
Configuration oldConf = new Configuration(schedConf);
for (LogMutation mutation : uncommittedLogs) {
for (Map.Entry<String, String> kv : mutation.getUpdates().entrySet()) {
if (kv.getValue() == null) {
schedConf.unset(kv.getKey());
} else {
schedConf.set(kv.getKey(), kv.getValue());
}
}
try {
rmContext.getScheduler().reinitialize(conf, rmContext);
} catch (IOException e) {
schedConf = oldConf;
confStore.confirmMutation(mutation.getId(), false);
LOG.info("Configuration mutation " + mutation.getId()
+ " was rejected", e);
continue;
}
confStore.confirmMutation(mutation.getId(), true);
LOG.info("Configuration mutation " + mutation.getId()+ " was accepted");
}
}
private Map<String, String> constructKeyValueConfUpdate(
SchedConfUpdateInfo mutationInfo) throws IOException {

View File

@ -21,6 +21,8 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import java.io.IOException;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
@ -43,7 +45,7 @@ public interface YarnConfigurationStore {
* LogMutation encapsulates the fields needed for configuration mutation
* audit logging and recovery.
*/
class LogMutation {
class LogMutation implements Serializable {
private Map<String, String> updates;
private String user;
private long id;
@ -106,16 +108,19 @@ public void setId(long id) {
* Initialize the configuration store.
* @param conf configuration to initialize store with
* @param schedConf Initial key-value configuration to persist
* @throws IOException if initialization fails
*/
void initialize(Configuration conf, Configuration schedConf);
void initialize(Configuration conf, Configuration schedConf)
throws IOException;
/**
* Logs the configuration change to backing store. Generates an id associated
* with this mutation, sets it in {@code logMutation}, and returns it.
* @param logMutation configuration change to be persisted in write ahead log
* @return id which configuration store associates with this mutation
* @throws IOException if logging fails
*/
long logMutation(LogMutation logMutation);
long logMutation(LogMutation logMutation) throws IOException;
/**
* Should be called after {@code logMutation}. Gets the pending mutation
@ -130,8 +135,9 @@ public void setId(long id) {
* @param isValid if true, update persisted configuration with mutation
* associated with {@code id}.
* @return true on success
* @throws IOException if mutation confirmation fails
*/
boolean confirmMutation(long id, boolean isValid);
boolean confirmMutation(long id, boolean isValid) throws IOException;
/**
* Retrieve the persisted configuration.

View File

@ -23,6 +23,7 @@
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
@ -43,7 +44,7 @@ public void setUp() {
}
@Test
public void testInMemoryConfigurationStore() {
public void testInMemoryConfigurationStore() throws IOException {
confStore = new InMemoryConfigurationStore();
confStore.initialize(new Configuration(), schedConf);
assertEquals("val1", confStore.retrieve().get("key1"));