diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index b603135cb8..ea8652d861 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -677,8 +677,21 @@ public static boolean isAclEnabled(Configuration conf) {
public static final String SCHEDULER_CONFIGURATION_STORE_CLASS =
YARN_PREFIX + "scheduler.configuration.store.class";
public static final String MEMORY_CONFIGURATION_STORE = "memory";
+ public static final String LEVELDB_CONFIGURATION_STORE = "leveldb";
public static final String DEFAULT_CONFIGURATION_STORE =
MEMORY_CONFIGURATION_STORE;
+ public static final String RM_SCHEDCONF_STORE_PATH = YARN_PREFIX
+ + "scheduler.configuration.leveldb-store.path";
+
+ public static final String RM_SCHEDCONF_LEVELDB_COMPACTION_INTERVAL_SECS =
+ YARN_PREFIX
+ + "scheduler.configuration.leveldb-store.compaction-interval-secs";
+ public static final long
+ DEFAULT_RM_SCHEDCONF_LEVELDB_COMPACTION_INTERVAL_SECS = 60 * 60 * 24L;
+
+ public static final String RM_SCHEDCONF_LEVELDB_MAX_LOGS =
+ YARN_PREFIX + "scheduler.configuration.leveldb-store.max-logs";
+ public static final int DEFAULT_RM_SCHEDCONF_LEVELDB_MAX_LOGS = 1000;
public static final String RM_SCHEDULER_MUTATION_ACL_POLICY_CLASS =
YARN_PREFIX + "scheduler.configuration.mutation.acl-policy.class";
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index e124c5b14a..5afec1bacd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -3390,4 +3390,33 @@
org.apache.hadoop.yarn.server.resourcemanager.scheduler.DefaultConfigurationMutationACLPolicy
+
+
+ The storage path for LevelDB implementation of configuration store,
+ when yarn.scheduler.configuration.store.class is configured to be
+ "leveldb".
+
+ yarn.scheduler.configuration.leveldb-store.path
+ ${hadoop.tmp.dir}/yarn/system/confstore
+
+
+
+
+ The compaction interval for LevelDB configuration store in secs,
+ when yarn.scheduler.configuration.store.class is configured to be
+ "leveldb". Default is one day.
+
+ yarn.scheduler.configuration.leveldb-store.compaction-interval-secs
+ 86400
+
+
+
+
+ The max number of configuration change log entries kept in LevelDB config
+ store, when yarn.scheduler.configuration.store.class is configured to be
+ "leveldb". Default is 1000.
+
+ yarn.scheduler.configuration.leveldb-store.max-logs
+ 1000
+
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java
index 86be7c378a..1f134677da 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java
@@ -28,6 +28,12 @@
*/
public interface MutableConfigurationProvider {
+ /**
+ * Apply transactions which were not committed.
+ * @throws IOException if recovery fails
+ */
+ void recoverConf() throws IOException;
+
/**
* Update the scheduler configuration with the provided key value pairs.
* @param user User issuing the request
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index da395b72d1..6d2de7ed95 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -393,6 +393,9 @@ public void serviceInit(Configuration conf) throws Exception {
@Override
public void serviceStart() throws Exception {
startSchedulerThreads();
+ if (this.csConfProvider instanceof MutableConfigurationProvider) {
+ ((MutableConfigurationProvider) csConfProvider).recoverConf();
+ }
super.serviceStart();
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java
new file mode 100644
index 0000000000..15346850c6
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java
@@ -0,0 +1,314 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.fusesource.leveldbjni.JniDBFactory;
+import org.fusesource.leveldbjni.internal.NativeDB;
+import org.iq80.leveldb.DB;
+import org.iq80.leveldb.DBComparator;
+import org.iq80.leveldb.DBException;
+import org.iq80.leveldb.DBIterator;
+import org.iq80.leveldb.Options;
+import org.iq80.leveldb.WriteBatch;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutput;
+import java.io.ObjectOutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import static org.fusesource.leveldbjni.JniDBFactory.bytes;
+
+/**
+ * A LevelDB implementation of {@link YarnConfigurationStore}.
+ */
+public class LeveldbConfigurationStore implements YarnConfigurationStore {
+
+ public static final Log LOG =
+ LogFactory.getLog(LeveldbConfigurationStore.class);
+
+ private static final String DB_NAME = "yarn-conf-store";
+ private static final String LOG_PREFIX = "log.";
+ private static final String LOG_COMMITTED_TXN = "committedTxn";
+
+ private DB db;
+ private long txnId = 0;
+ private long minTxn = 0;
+ private long maxLogs;
+ private Configuration conf;
+ private LinkedList pendingMutations = new LinkedList<>();
+ private Timer compactionTimer;
+ private long compactionIntervalMsec;
+
+ @Override
+ public void initialize(Configuration config, Configuration schedConf)
+ throws IOException {
+ this.conf = config;
+ try {
+ this.db = initDatabase(schedConf);
+ this.txnId = Long.parseLong(new String(db.get(bytes(LOG_COMMITTED_TXN)),
+ StandardCharsets.UTF_8));
+ DBIterator itr = db.iterator();
+ itr.seek(bytes(LOG_PREFIX + txnId));
+ // Seek to first uncommitted log
+ itr.next();
+ while (itr.hasNext()) {
+ Map.Entry entry = itr.next();
+ if (!new String(entry.getKey(), StandardCharsets.UTF_8)
+ .startsWith(LOG_PREFIX)) {
+ break;
+ }
+ pendingMutations.add(deserLogMutation(entry.getValue()));
+ }
+ // Get the earliest txnId stored in logs
+ itr.seekToFirst();
+ if (itr.hasNext()) {
+ Map.Entry entry = itr.next();
+ byte[] key = entry.getKey();
+ String logId = new String(key, StandardCharsets.UTF_8);
+ if (logId.startsWith(LOG_PREFIX)) {
+ minTxn = Long.parseLong(logId.substring(logId.indexOf('.') + 1));
+ }
+ }
+ this.maxLogs = config.getLong(
+ YarnConfiguration.RM_SCHEDCONF_LEVELDB_MAX_LOGS,
+ YarnConfiguration.DEFAULT_RM_SCHEDCONF_LEVELDB_MAX_LOGS);
+ this.compactionIntervalMsec = config.getLong(
+ YarnConfiguration.RM_SCHEDCONF_LEVELDB_COMPACTION_INTERVAL_SECS,
+ YarnConfiguration
+ .DEFAULT_RM_SCHEDCONF_LEVELDB_COMPACTION_INTERVAL_SECS) * 1000;
+ startCompactionTimer();
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ private DB initDatabase(Configuration config) throws Exception {
+ Path storeRoot = createStorageDir();
+ Options options = new Options();
+ options.createIfMissing(false);
+ options.comparator(new DBComparator() {
+ @Override
+ public int compare(byte[] key1, byte[] key2) {
+ String key1Str = new String(key1, StandardCharsets.UTF_8);
+ String key2Str = new String(key2, StandardCharsets.UTF_8);
+ int key1Txn = Integer.MAX_VALUE;
+ int key2Txn = Integer.MAX_VALUE;
+ if (key1Str.startsWith(LOG_PREFIX)) {
+ key1Txn = Integer.parseInt(key1Str.substring(
+ key1Str.indexOf('.') + 1));
+ }
+ if (key2Str.startsWith(LOG_PREFIX)) {
+ key2Txn = Integer.parseInt(key2Str.substring(
+ key2Str.indexOf('.') + 1));
+ }
+ // TODO txnId could overflow, in theory
+ if (key1Txn == Integer.MAX_VALUE && key2Txn == Integer.MAX_VALUE) {
+ if (key1Str.equals(key2Str) && key1Str.equals(LOG_COMMITTED_TXN)) {
+ return 0;
+ } else if (key1Str.equals(LOG_COMMITTED_TXN)) {
+ return -1;
+ } else if (key2Str.equals(LOG_COMMITTED_TXN)) {
+ return 1;
+ }
+ return key1Str.compareTo(key2Str);
+ }
+ return key1Txn - key2Txn;
+ }
+
+ @Override
+ public String name() {
+ return "logComparator";
+ }
+
+ public byte[] findShortestSeparator(byte[] start, byte[] limit) {
+ return start;
+ }
+
+ public byte[] findShortSuccessor(byte[] key) {
+ return key;
+ }
+ });
+ LOG.info("Using conf database at " + storeRoot);
+ File dbfile = new File(storeRoot.toString());
+ try {
+ db = JniDBFactory.factory.open(dbfile, options);
+ } catch (NativeDB.DBException e) {
+ if (e.isNotFound() || e.getMessage().contains(" does not exist ")) {
+ LOG.info("Creating conf database at " + dbfile);
+ options.createIfMissing(true);
+ try {
+ db = JniDBFactory.factory.open(dbfile, options);
+ // Write the initial scheduler configuration
+ WriteBatch initBatch = db.createWriteBatch();
+ for (Map.Entry kv : config) {
+ initBatch.put(bytes(kv.getKey()), bytes(kv.getValue()));
+ }
+ initBatch.put(bytes(LOG_COMMITTED_TXN), bytes("0"));
+ db.write(initBatch);
+ } catch (DBException dbErr) {
+ throw new IOException(dbErr.getMessage(), dbErr);
+ }
+ } else {
+ throw e;
+ }
+ }
+ return db;
+ }
+
+ private Path createStorageDir() throws IOException {
+ Path root = getStorageDir();
+ FileSystem fs = FileSystem.getLocal(conf);
+ fs.mkdirs(root, new FsPermission((short) 0700));
+ return root;
+ }
+
+ private Path getStorageDir() throws IOException {
+ String storePath = conf.get(YarnConfiguration.RM_SCHEDCONF_STORE_PATH);
+ if (storePath == null) {
+ throw new IOException("No store location directory configured in " +
+ YarnConfiguration.RM_SCHEDCONF_STORE_PATH);
+ }
+ return new Path(storePath, DB_NAME);
+ }
+
+ @Override
+ public synchronized long logMutation(LogMutation logMutation)
+ throws IOException {
+ logMutation.setId(++txnId);
+ WriteBatch logBatch = db.createWriteBatch();
+ logBatch.put(bytes(LOG_PREFIX + txnId), serLogMutation(logMutation));
+ if (txnId - minTxn >= maxLogs) {
+ logBatch.delete(bytes(LOG_PREFIX + minTxn));
+ minTxn++;
+ }
+ db.write(logBatch);
+ pendingMutations.add(logMutation);
+ return txnId;
+ }
+
+ @Override
+ public synchronized boolean confirmMutation(long id, boolean isValid)
+ throws IOException {
+ WriteBatch updateBatch = db.createWriteBatch();
+ if (isValid) {
+ LogMutation mutation = deserLogMutation(db.get(bytes(LOG_PREFIX + id)));
+ for (Map.Entry changes :
+ mutation.getUpdates().entrySet()) {
+ if (changes.getValue() == null || changes.getValue().isEmpty()) {
+ updateBatch.delete(bytes(changes.getKey()));
+ } else {
+ updateBatch.put(bytes(changes.getKey()), bytes(changes.getValue()));
+ }
+ }
+ }
+ updateBatch.put(bytes(LOG_COMMITTED_TXN), bytes(String.valueOf(id)));
+ db.write(updateBatch);
+ // Assumes logMutation and confirmMutation are done in the same
+ // synchronized method. For example,
+ // {@link MutableCSConfigurationProvider#mutateConfiguration(
+ // UserGroupInformation user, SchedConfUpdateInfo confUpdate)}
+ pendingMutations.removeFirst();
+ return true;
+ }
+
+ private byte[] serLogMutation(LogMutation mutation) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try (ObjectOutput oos = new ObjectOutputStream(baos)) {
+ oos.writeObject(mutation);
+ oos.flush();
+ return baos.toByteArray();
+ }
+ }
+ private LogMutation deserLogMutation(byte[] mutation) throws IOException {
+ try (ObjectInput input = new ObjectInputStream(
+ new ByteArrayInputStream(mutation))) {
+ return (LogMutation) input.readObject();
+ } catch (ClassNotFoundException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public synchronized Configuration retrieve() {
+ DBIterator itr = db.iterator();
+ itr.seek(bytes(LOG_COMMITTED_TXN));
+ Configuration config = new Configuration(false);
+ itr.next();
+ while (itr.hasNext()) {
+ Map.Entry entry = itr.next();
+ config.set(new String(entry.getKey(), StandardCharsets.UTF_8),
+ new String(entry.getValue(), StandardCharsets.UTF_8));
+ }
+ return config;
+ }
+
+ @Override
+ public List getPendingMutations() {
+ return pendingMutations;
+ }
+
+ @Override
+ public List getConfirmedConfHistory(long fromId) {
+ return null; // unimplemented
+ }
+
+ // TODO below was taken from LeveldbRMStateStore, it can probably be
+ // refactored
+ private void startCompactionTimer() {
+ if (compactionIntervalMsec > 0) {
+ compactionTimer = new Timer(
+ this.getClass().getSimpleName() + " compaction timer", true);
+ compactionTimer.schedule(new CompactionTimerTask(),
+ compactionIntervalMsec, compactionIntervalMsec);
+ }
+ }
+
+ private class CompactionTimerTask extends TimerTask {
+ @Override
+ public void run() {
+ long start = Time.monotonicNow();
+ LOG.info("Starting full compaction cycle");
+ try {
+ db.compactRange(null, null);
+ } catch (DBException e) {
+ LOG.error("Error compacting database", e);
+ }
+ long duration = Time.monotonicNow() - start;
+ LOG.info("Full compaction cycle completed in " + duration + " msec");
+ }
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java
index 670c0f9573..9ccc146f19 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java
@@ -19,6 +19,8 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
import com.google.common.base.Joiner;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
@@ -49,6 +51,9 @@
public class MutableCSConfigurationProvider implements CSConfigurationProvider,
MutableConfigurationProvider {
+ public static final Log LOG =
+ LogFactory.getLog(MutableCSConfigurationProvider.class);
+
private Configuration schedConf;
private YarnConfigurationStore confStore;
private ConfigurationMutationACLPolicy aclMutationPolicy;
@@ -68,6 +73,9 @@ public void init(Configuration config) throws IOException {
case YarnConfiguration.MEMORY_CONFIGURATION_STORE:
this.confStore = new InMemoryConfigurationStore();
break;
+ case YarnConfiguration.LEVELDB_CONFIGURATION_STORE:
+ this.confStore = new LeveldbConfigurationStore();
+ break;
default:
this.confStore = YarnConfigurationStoreFactory.getStore(config);
break;
@@ -82,6 +90,9 @@ public void init(Configuration config) throws IOException {
schedConf.set(kv.getKey(), kv.getValue());
}
confStore.initialize(config, schedConf);
+ // After initializing confStore, the store may already have an existing
+ // configuration. Use this one.
+ schedConf = confStore.retrieve();
this.aclMutationPolicy = ConfigurationMutationACLPolicyFactory
.getPolicy(config);
aclMutationPolicy.init(config, rmContext);
@@ -97,7 +108,7 @@ public CapacitySchedulerConfiguration loadConfiguration(Configuration
}
@Override
- public void mutateConfiguration(UserGroupInformation user,
+ public synchronized void mutateConfiguration(UserGroupInformation user,
SchedConfUpdateInfo confUpdate) throws IOException {
if (!aclMutationPolicy.isMutationAllowed(user, confUpdate)) {
throw new AccessControlException("User is not admin of all modified" +
@@ -124,6 +135,31 @@ public void mutateConfiguration(UserGroupInformation user,
confStore.confirmMutation(id, true);
}
+ @Override
+ public void recoverConf() throws IOException {
+ List uncommittedLogs = confStore.getPendingMutations();
+ Configuration oldConf = new Configuration(schedConf);
+ for (LogMutation mutation : uncommittedLogs) {
+ for (Map.Entry kv : mutation.getUpdates().entrySet()) {
+ if (kv.getValue() == null) {
+ schedConf.unset(kv.getKey());
+ } else {
+ schedConf.set(kv.getKey(), kv.getValue());
+ }
+ }
+ try {
+ rmContext.getScheduler().reinitialize(conf, rmContext);
+ } catch (IOException e) {
+ schedConf = oldConf;
+ confStore.confirmMutation(mutation.getId(), false);
+ LOG.info("Configuration mutation " + mutation.getId()
+ + " was rejected", e);
+ continue;
+ }
+ confStore.confirmMutation(mutation.getId(), true);
+ LOG.info("Configuration mutation " + mutation.getId()+ " was accepted");
+ }
+ }
private Map constructKeyValueConfUpdate(
SchedConfUpdateInfo mutationInfo) throws IOException {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java
index 22c0ef8f5e..065c877eeb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java
@@ -21,6 +21,8 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import java.io.IOException;
+import java.io.Serializable;
import java.util.List;
import java.util.Map;
@@ -43,7 +45,7 @@ public interface YarnConfigurationStore {
* LogMutation encapsulates the fields needed for configuration mutation
* audit logging and recovery.
*/
- class LogMutation {
+ class LogMutation implements Serializable {
private Map updates;
private String user;
private long id;
@@ -106,16 +108,19 @@ public void setId(long id) {
* Initialize the configuration store.
* @param conf configuration to initialize store with
* @param schedConf Initial key-value configuration to persist
+ * @throws IOException if initialization fails
*/
- void initialize(Configuration conf, Configuration schedConf);
+ void initialize(Configuration conf, Configuration schedConf)
+ throws IOException;
/**
* Logs the configuration change to backing store. Generates an id associated
* with this mutation, sets it in {@code logMutation}, and returns it.
* @param logMutation configuration change to be persisted in write ahead log
* @return id which configuration store associates with this mutation
+ * @throws IOException if logging fails
*/
- long logMutation(LogMutation logMutation);
+ long logMutation(LogMutation logMutation) throws IOException;
/**
* Should be called after {@code logMutation}. Gets the pending mutation
@@ -130,8 +135,9 @@ public void setId(long id) {
* @param isValid if true, update persisted configuration with mutation
* associated with {@code id}.
* @return true on success
+ * @throws IOException if mutation confirmation fails
*/
- boolean confirmMutation(long id, boolean isValid);
+ boolean confirmMutation(long id, boolean isValid) throws IOException;
/**
* Retrieve the persisted configuration.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestYarnConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestYarnConfigurationStore.java
index dff4e77470..631ce657e8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestYarnConfigurationStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestYarnConfigurationStore.java
@@ -23,6 +23,7 @@
import org.junit.Before;
import org.junit.Test;
+import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
@@ -43,7 +44,7 @@ public void setUp() {
}
@Test
- public void testInMemoryConfigurationStore() {
+ public void testInMemoryConfigurationStore() throws IOException {
confStore = new InMemoryConfigurationStore();
confStore.initialize(new Configuration(), schedConf);
assertEquals("val1", confStore.retrieve().get("key1"));