YARN-6840. Implement zookeeper based store for scheduler configuration updates. (Jonathan Hung via wangda)

Change-Id: I9debea674fe8c7e4109d4ca136965a1ea4c48bcc
This commit is contained in:
Wangda Tan 2017-09-18 09:53:42 -07:00 committed by Jonathan Hung
parent 4d8abd84f4
commit ff39c0de20
20 changed files with 1040 additions and 425 deletions

View File

@ -678,6 +678,7 @@ public static boolean isAclEnabled(Configuration conf) {
YARN_PREFIX + "scheduler.configuration.store.class"; YARN_PREFIX + "scheduler.configuration.store.class";
public static final String MEMORY_CONFIGURATION_STORE = "memory"; public static final String MEMORY_CONFIGURATION_STORE = "memory";
public static final String LEVELDB_CONFIGURATION_STORE = "leveldb"; public static final String LEVELDB_CONFIGURATION_STORE = "leveldb";
public static final String ZK_CONFIGURATION_STORE = "zk";
public static final String DEFAULT_CONFIGURATION_STORE = public static final String DEFAULT_CONFIGURATION_STORE =
MEMORY_CONFIGURATION_STORE; MEMORY_CONFIGURATION_STORE;
public static final String RM_SCHEDCONF_STORE_PATH = YARN_PREFIX public static final String RM_SCHEDCONF_STORE_PATH = YARN_PREFIX
@ -689,9 +690,16 @@ public static boolean isAclEnabled(Configuration conf) {
public static final long public static final long
DEFAULT_RM_SCHEDCONF_LEVELDB_COMPACTION_INTERVAL_SECS = 60 * 60 * 24L; DEFAULT_RM_SCHEDCONF_LEVELDB_COMPACTION_INTERVAL_SECS = 60 * 60 * 24L;
public static final String RM_SCHEDCONF_LEVELDB_MAX_LOGS = public static final String RM_SCHEDCONF_MAX_LOGS =
YARN_PREFIX + "scheduler.configuration.leveldb-store.max-logs"; YARN_PREFIX + "scheduler.configuration.store.max-logs";
public static final int DEFAULT_RM_SCHEDCONF_LEVELDB_MAX_LOGS = 1000; public static final long DEFAULT_RM_SCHEDCONF_LEVELDB_MAX_LOGS = 1000;
public static final long DEFAULT_RM_SCHEDCONF_ZK_MAX_LOGS = 1000;
/** Parent znode path under which ZKConfigurationStore will create znodes. */
public static final String RM_SCHEDCONF_STORE_ZK_PARENT_PATH = YARN_PREFIX
+ "scheduler.configuration.zk-store.parent-path";
public static final String DEFAULT_RM_SCHEDCONF_STORE_ZK_PARENT_PATH =
"/confstore";
public static final String RM_SCHEDULER_MUTATION_ACL_POLICY_CLASS = public static final String RM_SCHEDULER_MUTATION_ACL_POLICY_CLASS =
YARN_PREFIX + "scheduler.configuration.mutation.acl-policy.class"; YARN_PREFIX + "scheduler.configuration.mutation.acl-policy.class";

View File

@ -3412,11 +3412,20 @@
<property> <property>
<description> <description>
The max number of configuration change log entries kept in LevelDB config The max number of configuration change log entries kept in config
store, when yarn.scheduler.configuration.store.class is configured to be store, when yarn.scheduler.configuration.store.class is configured to be
"leveldb". Default is 1000. "leveldb" or "zk". Default is 1000 for either.
</description> </description>
<name>yarn.scheduler.configuration.leveldb-store.max-logs</name> <name>yarn.scheduler.configuration.store.max-logs</name>
<value>1000</value> <value>1000</value>
</property> </property>
<property>
<description>
ZK root node path for configuration store when using zookeeper-based
configuration store.
</description>
<name>yarn.scheduler.configuration.zk-store.parent-path</name>
<value>/confstore</value>
</property>
</configuration> </configuration>

View File

@ -387,9 +387,7 @@ public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request)
RefreshQueuesResponse response = RefreshQueuesResponse response =
recordFactory.newRecordInstance(RefreshQueuesResponse.class); recordFactory.newRecordInstance(RefreshQueuesResponse.class);
try { try {
ResourceScheduler scheduler = rm.getRMContext().getScheduler(); if (isSchedulerMutable()) {
if (scheduler instanceof MutableConfScheduler
&& ((MutableConfScheduler) scheduler).isConfigurationMutable()) {
throw new IOException("Scheduler configuration is mutable. " + throw new IOException("Scheduler configuration is mutable. " +
operation + " is not allowed in this scenario."); operation + " is not allowed in this scenario.");
} }
@ -413,6 +411,12 @@ public void refreshQueues() throws IOException, YarnException {
} }
} }
private boolean isSchedulerMutable() {
ResourceScheduler scheduler = rm.getRMContext().getScheduler();
return (scheduler instanceof MutableConfScheduler
&& ((MutableConfScheduler) scheduler).isConfigurationMutable());
}
@Override @Override
public RefreshNodesResponse refreshNodes(RefreshNodesRequest request) public RefreshNodesResponse refreshNodes(RefreshNodesRequest request)
throws YarnException, StandbyException { throws YarnException, StandbyException {
@ -721,6 +725,14 @@ private synchronized Configuration getConfiguration(Configuration conf,
void refreshAll() throws ServiceFailedException { void refreshAll() throws ServiceFailedException {
try { try {
checkAcls("refreshAll"); checkAcls("refreshAll");
if (isSchedulerMutable()) {
try {
((MutableConfScheduler) rm.getRMContext().getScheduler())
.getMutableConfProvider().reloadConfigurationFromStore();
} catch (Exception e) {
throw new IOException("Failed to refresh configuration:", e);
}
}
refreshQueues(); refreshQueues();
refreshNodes(); refreshNodes();
refreshSuperUserGroupsConfiguration(); refreshSuperUserGroupsConfiguration();

View File

@ -351,7 +351,7 @@ protected EmbeddedElector createEmbeddedElector() throws IOException {
conf.getBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR, conf.getBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR,
YarnConfiguration.DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED); YarnConfiguration.DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED);
if (curatorEnabled) { if (curatorEnabled) {
this.zkManager = createAndStartZKManager(conf); this.zkManager = getAndStartZKManager(conf);
elector = new CuratorBasedElectorService(this); elector = new CuratorBasedElectorService(this);
} else { } else {
elector = new ActiveStandbyElectorBasedElectorService(this); elector = new ActiveStandbyElectorBasedElectorService(this);
@ -360,13 +360,16 @@ protected EmbeddedElector createEmbeddedElector() throws IOException {
} }
/** /**
* Create and ZooKeeper Curator manager. * Get ZooKeeper Curator manager, creating and starting if not exists.
* @param config Configuration for the ZooKeeper curator. * @param config Configuration for the ZooKeeper curator.
* @return New ZooKeeper Curator manager. * @return ZooKeeper Curator manager.
* @throws IOException If it cannot create the manager. * @throws IOException If it cannot create the manager.
*/ */
public ZKCuratorManager createAndStartZKManager(Configuration config) public synchronized ZKCuratorManager getAndStartZKManager(Configuration
throws IOException { config) throws IOException {
if (this.zkManager != null) {
return zkManager;
}
ZKCuratorManager manager = new ZKCuratorManager(config); ZKCuratorManager manager = new ZKCuratorManager(config);
// Get authentication // Get authentication
@ -386,15 +389,8 @@ public ZKCuratorManager createAndStartZKManager(Configuration config)
} }
manager.start(authInfos); manager.start(authInfos);
return manager; this.zkManager = manager;
} return zkManager;
/**
* Get the ZooKeeper Curator manager.
* @return ZooKeeper Curator manager.
*/
public ZKCuratorManager getZKManager() {
return this.zkManager;
} }
public CuratorFramework getCurator() { public CuratorFramework getCurator() {

View File

@ -22,7 +22,7 @@
/** /**
* This exception is thrown by ResourceManager if it's loading an incompatible * This exception is thrown by ResourceManager if it's loading an incompatible
* version of state from state store on recovery. * version of storage on recovery.
*/ */
public class RMStateVersionIncompatibleException extends YarnException { public class RMStateVersionIncompatibleException extends YarnException {

View File

@ -327,10 +327,7 @@ public synchronized void initInternal(Configuration conf)
amrmTokenSecretManagerRoot = amrmTokenSecretManagerRoot =
getNodePath(zkRootNodePath, AMRMTOKEN_SECRET_MANAGER_ROOT); getNodePath(zkRootNodePath, AMRMTOKEN_SECRET_MANAGER_ROOT);
reservationRoot = getNodePath(zkRootNodePath, RESERVATION_SYSTEM_ROOT); reservationRoot = getNodePath(zkRootNodePath, RESERVATION_SYSTEM_ROOT);
zkManager = resourceManager.getZKManager(); zkManager = resourceManager.getAndStartZKManager(conf);
if (zkManager == null) {
zkManager = resourceManager.createAndStartZKManager(conf);
}
} }
@Override @Override

View File

@ -18,11 +18,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler; package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
import java.io.IOException;
/** /**
* Interface for a scheduler that supports changing configuration at runtime. * Interface for a scheduler that supports changing configuration at runtime.
@ -30,16 +25,6 @@
*/ */
public interface MutableConfScheduler extends ResourceScheduler { public interface MutableConfScheduler extends ResourceScheduler {
/**
* Update the scheduler's configuration.
* @param user Caller of this update
* @param confUpdate configuration update
* @throws IOException if scheduler could not be reinitialized
* @throws YarnException if reservation system could not be reinitialized
*/
void updateConfiguration(UserGroupInformation user,
SchedConfUpdateInfo confUpdate) throws IOException, YarnException;
/** /**
* Get the scheduler configuration. * Get the scheduler configuration.
* @return the scheduler configuration * @return the scheduler configuration
@ -58,4 +43,11 @@ void updateConfiguration(UserGroupInformation user,
* @return whether scheduler configuration is mutable or not. * @return whether scheduler configuration is mutable or not.
*/ */
boolean isConfigurationMutable(); boolean isConfigurationMutable();
/**
* Get scheduler's configuration provider, so other classes can directly
* call mutation APIs on configuration provider.
* @return scheduler's configuration provider
*/
MutableConfigurationProvider getMutableConfProvider();
} }

View File

@ -19,30 +19,40 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler; package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo; import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
import java.io.IOException;
/** /**
* Interface for allowing changing scheduler configurations. * Interface for allowing changing scheduler configurations.
*/ */
public interface MutableConfigurationProvider { public interface MutableConfigurationProvider {
/** /**
* Apply transactions which were not committed. * Get the acl mutation policy for this configuration provider.
* @throws IOException if recovery fails * @return The acl mutation policy.
*/ */
void recoverConf() throws IOException; ConfigurationMutationACLPolicy getAclMutationPolicy();
/** /**
* Update the scheduler configuration with the provided key value pairs. * Called when a new ResourceManager is starting/becomes active. Ensures
* @param user User issuing the request * configuration is up-to-date.
* @param confUpdate Key-value pairs for configurations to be updated. * @throws Exception if configuration could not be refreshed from store
* @throws IOException if scheduler could not be reinitialized
* @throws YarnException if reservation system could not be reinitialized
*/ */
void mutateConfiguration(UserGroupInformation user, SchedConfUpdateInfo void reloadConfigurationFromStore() throws Exception;
confUpdate) throws IOException, YarnException;
/**
* Log user's requested configuration mutation, and applies it in-memory.
* @param user User who requested the change
* @param confUpdate User's requested configuration change
* @throws Exception if logging the mutation fails
*/
void logAndApplyMutation(UserGroupInformation user, SchedConfUpdateInfo
confUpdate) throws Exception;
/**
* Confirm last logged mutation.
* @param isValid if the last logged mutation is applied to scheduler
* properly.
* @throws Exception if confirming mutation fails
*/
void confirmPendingMutation(boolean isValid) throws Exception;
} }

View File

@ -141,7 +141,6 @@
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
@ -393,9 +392,6 @@ public void serviceInit(Configuration conf) throws Exception {
@Override @Override
public void serviceStart() throws Exception { public void serviceStart() throws Exception {
startSchedulerThreads(); startSchedulerThreads();
if (this.csConfProvider instanceof MutableConfigurationProvider) {
((MutableConfigurationProvider) csConfProvider).recoverConf();
}
super.serviceStart(); super.serviceStart();
} }
@ -2618,20 +2614,16 @@ public long getMaximumApplicationLifetime(String queueName) {
return ((LeafQueue) queue).getMaximumApplicationLifetime(); return ((LeafQueue) queue).getMaximumApplicationLifetime();
} }
@Override
public void updateConfiguration(UserGroupInformation user,
SchedConfUpdateInfo confUpdate) throws IOException, YarnException {
if (isConfigurationMutable()) {
((MutableConfigurationProvider) csConfProvider).mutateConfiguration(
user, confUpdate);
} else {
throw new UnsupportedOperationException("Configured CS configuration " +
"provider does not support updating configuration.");
}
}
@Override @Override
public boolean isConfigurationMutable() { public boolean isConfigurationMutable() {
return csConfProvider instanceof MutableConfigurationProvider; return csConfProvider instanceof MutableConfigurationProvider;
} }
@Override
public MutableConfigurationProvider getMutableConfProvider() {
if (isConfigurationMutable()) {
return (MutableConfigurationProvider) csConfProvider;
}
return null;
}
} }

View File

@ -19,8 +19,9 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -28,48 +29,35 @@
* A default implementation of {@link YarnConfigurationStore}. Doesn't offer * A default implementation of {@link YarnConfigurationStore}. Doesn't offer
* persistent configuration storage, just stores the configuration in memory. * persistent configuration storage, just stores the configuration in memory.
*/ */
public class InMemoryConfigurationStore implements YarnConfigurationStore { public class InMemoryConfigurationStore extends YarnConfigurationStore {
private Configuration schedConf; private Configuration schedConf;
private LinkedList<LogMutation> pendingMutations; private LogMutation pendingMutation;
private long pendingId;
@Override @Override
public void initialize(Configuration conf, Configuration schedConf) { public void initialize(Configuration conf, Configuration schedConf,
RMContext rmContext) {
this.schedConf = schedConf; this.schedConf = schedConf;
this.pendingMutations = new LinkedList<>();
this.pendingId = 0;
} }
@Override @Override
public synchronized long logMutation(LogMutation logMutation) { public void logMutation(LogMutation logMutation) {
logMutation.setId(++pendingId); pendingMutation = logMutation;
pendingMutations.add(logMutation);
return pendingId;
} }
@Override @Override
public synchronized boolean confirmMutation(long id, boolean isValid) { public void confirmMutation(boolean isValid) {
LogMutation mutation = pendingMutations.poll(); if (isValid) {
// If confirmMutation is called out of order, discard mutations until id for (Map.Entry<String, String> kv : pendingMutation.getUpdates()
// is reached. .entrySet()) {
while (mutation != null) { if (kv.getValue() == null) {
if (mutation.getId() == id) { schedConf.unset(kv.getKey());
if (isValid) { } else {
Map<String, String> mutations = mutation.getUpdates(); schedConf.set(kv.getKey(), kv.getValue());
for (Map.Entry<String, String> kv : mutations.entrySet()) {
if (kv.getValue() == null) {
schedConf.unset(kv.getKey());
} else {
schedConf.set(kv.getKey(), kv.getValue());
}
}
} }
return true;
} }
mutation = pendingMutations.poll();
} }
return false; pendingMutation = null;
} }
@Override @Override
@ -77,14 +65,31 @@ public synchronized Configuration retrieve() {
return schedConf; return schedConf;
} }
@Override
public synchronized List<LogMutation> getPendingMutations() {
return new LinkedList<>(pendingMutations);
}
@Override @Override
public List<LogMutation> getConfirmedConfHistory(long fromId) { public List<LogMutation> getConfirmedConfHistory(long fromId) {
// Unimplemented. // Unimplemented.
return null; return null;
} }
@Override
public Version getConfStoreVersion() throws Exception {
// Does nothing.
return null;
}
@Override
public void storeVersion() throws Exception {
// Does nothing.
}
@Override
public Version getCurrentVersion() {
// Does nothing.
return null;
}
@Override
public void checkVersion() {
// Does nothing. (Version is always compatible since it's in memory)
}
} }

View File

@ -26,6 +26,10 @@
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.fusesource.leveldbjni.JniDBFactory; import org.fusesource.leveldbjni.JniDBFactory;
import org.fusesource.leveldbjni.internal.NativeDB; import org.fusesource.leveldbjni.internal.NativeDB;
import org.iq80.leveldb.DB; import org.iq80.leveldb.DB;
@ -55,58 +59,32 @@
/** /**
* A LevelDB implementation of {@link YarnConfigurationStore}. * A LevelDB implementation of {@link YarnConfigurationStore}.
*/ */
public class LeveldbConfigurationStore implements YarnConfigurationStore { public class LeveldbConfigurationStore extends YarnConfigurationStore {
public static final Log LOG = public static final Log LOG =
LogFactory.getLog(LeveldbConfigurationStore.class); LogFactory.getLog(LeveldbConfigurationStore.class);
private static final String DB_NAME = "yarn-conf-store"; private static final String DB_NAME = "yarn-conf-store";
private static final String LOG_PREFIX = "log."; private static final String LOG_KEY = "log";
private static final String LOG_COMMITTED_TXN = "committedTxn"; private static final String VERSION_KEY = "version";
private DB db; private DB db;
// Txnid for the last transaction logged to the store.
private long txnId = 0;
private long minTxn = 0;
private long maxLogs; private long maxLogs;
private Configuration conf; private Configuration conf;
private LinkedList<LogMutation> pendingMutations = new LinkedList<>(); private LogMutation pendingMutation;
private static final Version CURRENT_VERSION_INFO = Version
.newInstance(0, 1);
private Timer compactionTimer; private Timer compactionTimer;
private long compactionIntervalMsec; private long compactionIntervalMsec;
@Override @Override
public void initialize(Configuration config, Configuration schedConf) public void initialize(Configuration config, Configuration schedConf,
throws IOException { RMContext rmContext) throws IOException {
this.conf = config; this.conf = config;
try { try {
this.db = initDatabase(schedConf); this.db = initDatabase(schedConf);
this.txnId = Long.parseLong(new String(db.get(bytes(LOG_COMMITTED_TXN)),
StandardCharsets.UTF_8));
DBIterator itr = db.iterator();
itr.seek(bytes(LOG_PREFIX + txnId));
// Seek to first uncommitted log
itr.next();
while (itr.hasNext()) {
Map.Entry<byte[], byte[]> entry = itr.next();
if (!new String(entry.getKey(), StandardCharsets.UTF_8)
.startsWith(LOG_PREFIX)) {
break;
}
pendingMutations.add(deserLogMutation(entry.getValue()));
txnId++;
}
// Get the earliest txnId stored in logs
itr.seekToFirst();
if (itr.hasNext()) {
Map.Entry<byte[], byte[]> entry = itr.next();
byte[] key = entry.getKey();
String logId = new String(key, StandardCharsets.UTF_8);
if (logId.startsWith(LOG_PREFIX)) {
minTxn = Long.parseLong(logId.substring(logId.indexOf('.') + 1));
}
}
this.maxLogs = config.getLong( this.maxLogs = config.getLong(
YarnConfiguration.RM_SCHEDCONF_LEVELDB_MAX_LOGS, YarnConfiguration.RM_SCHEDCONF_MAX_LOGS,
YarnConfiguration.DEFAULT_RM_SCHEDCONF_LEVELDB_MAX_LOGS); YarnConfiguration.DEFAULT_RM_SCHEDCONF_LEVELDB_MAX_LOGS);
this.compactionIntervalMsec = config.getLong( this.compactionIntervalMsec = config.getLong(
YarnConfiguration.RM_SCHEDCONF_LEVELDB_COMPACTION_INTERVAL_SECS, YarnConfiguration.RM_SCHEDCONF_LEVELDB_COMPACTION_INTERVAL_SECS,
@ -127,33 +105,23 @@ private DB initDatabase(Configuration config) throws Exception {
public int compare(byte[] key1, byte[] key2) { public int compare(byte[] key1, byte[] key2) {
String key1Str = new String(key1, StandardCharsets.UTF_8); String key1Str = new String(key1, StandardCharsets.UTF_8);
String key2Str = new String(key2, StandardCharsets.UTF_8); String key2Str = new String(key2, StandardCharsets.UTF_8);
int key1Txn = Integer.MAX_VALUE; if (key1Str.equals(key2Str)) {
int key2Txn = Integer.MAX_VALUE; return 0;
if (key1Str.startsWith(LOG_PREFIX)) { } else if (key1Str.equals(VERSION_KEY)) {
key1Txn = Integer.parseInt(key1Str.substring( return -1;
key1Str.indexOf('.') + 1)); } else if (key2Str.equals(VERSION_KEY)) {
return 1;
} else if (key1Str.equals(LOG_KEY)) {
return -1;
} else if (key2Str.equals(LOG_KEY)) {
return 1;
} }
if (key2Str.startsWith(LOG_PREFIX)) { return key1Str.compareTo(key2Str);
key2Txn = Integer.parseInt(key2Str.substring(
key2Str.indexOf('.') + 1));
}
// TODO txnId could overflow, in theory
if (key1Txn == Integer.MAX_VALUE && key2Txn == Integer.MAX_VALUE) {
if (key1Str.equals(key2Str) && key1Str.equals(LOG_COMMITTED_TXN)) {
return 0;
} else if (key1Str.equals(LOG_COMMITTED_TXN)) {
return -1;
} else if (key2Str.equals(LOG_COMMITTED_TXN)) {
return 1;
}
return key1Str.compareTo(key2Str);
}
return key1Txn - key2Txn;
} }
@Override @Override
public String name() { public String name() {
return "logComparator"; return "keyComparator";
} }
public byte[] findShortestSeparator(byte[] start, byte[] limit) { public byte[] findShortestSeparator(byte[] start, byte[] limit) {
@ -164,6 +132,7 @@ public byte[] findShortSuccessor(byte[] key) {
return key; return key;
} }
}); });
LOG.info("Using conf database at " + storeRoot); LOG.info("Using conf database at " + storeRoot);
File dbfile = new File(storeRoot.toString()); File dbfile = new File(storeRoot.toString());
try { try {
@ -179,7 +148,6 @@ public byte[] findShortSuccessor(byte[] key) {
for (Map.Entry<String, String> kv : config) { for (Map.Entry<String, String> kv : config) {
initBatch.put(bytes(kv.getKey()), bytes(kv.getValue())); initBatch.put(bytes(kv.getKey()), bytes(kv.getValue()));
} }
initBatch.put(bytes(LOG_COMMITTED_TXN), bytes("0"));
db.write(initBatch); db.write(initBatch);
} catch (DBException dbErr) { } catch (DBException dbErr) {
throw new IOException(dbErr.getMessage(), dbErr); throw new IOException(dbErr.getMessage(), dbErr);
@ -208,28 +176,22 @@ private Path getStorageDir() throws IOException {
} }
@Override @Override
public synchronized long logMutation(LogMutation logMutation) public void logMutation(LogMutation logMutation) throws IOException {
throws IOException { LinkedList<LogMutation> logs = deserLogMutations(db.get(bytes(LOG_KEY)));
logMutation.setId(++txnId); logs.add(logMutation);
WriteBatch logBatch = db.createWriteBatch(); if (logs.size() > maxLogs) {
logBatch.put(bytes(LOG_PREFIX + txnId), serLogMutation(logMutation)); logs.removeFirst();
if (txnId - minTxn >= maxLogs) {
logBatch.delete(bytes(LOG_PREFIX + minTxn));
minTxn++;
} }
db.write(logBatch); db.put(bytes(LOG_KEY), serLogMutations(logs));
pendingMutations.add(logMutation); pendingMutation = logMutation;
return txnId;
} }
@Override @Override
public synchronized boolean confirmMutation(long id, boolean isValid) public void confirmMutation(boolean isValid) throws IOException {
throws IOException {
WriteBatch updateBatch = db.createWriteBatch(); WriteBatch updateBatch = db.createWriteBatch();
if (isValid) { if (isValid) {
LogMutation mutation = deserLogMutation(db.get(bytes(LOG_PREFIX + id)));
for (Map.Entry<String, String> changes : for (Map.Entry<String, String> changes :
mutation.getUpdates().entrySet()) { pendingMutation.getUpdates().entrySet()) {
if (changes.getValue() == null || changes.getValue().isEmpty()) { if (changes.getValue() == null || changes.getValue().isEmpty()) {
updateBatch.delete(bytes(changes.getKey())); updateBatch.delete(bytes(changes.getKey()));
} else { } else {
@ -237,28 +199,24 @@ public synchronized boolean confirmMutation(long id, boolean isValid)
} }
} }
} }
updateBatch.put(bytes(LOG_COMMITTED_TXN), bytes(String.valueOf(id)));
db.write(updateBatch); db.write(updateBatch);
// Assumes logMutation and confirmMutation are done in the same pendingMutation = null;
// synchronized method. For example,
// {@link MutableCSConfigurationProvider#mutateConfiguration(
// UserGroupInformation user, SchedConfUpdateInfo confUpdate)}
pendingMutations.removeFirst();
return true;
} }
private byte[] serLogMutation(LogMutation mutation) throws IOException { private byte[] serLogMutations(LinkedList<LogMutation> mutations) throws
IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream(); ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (ObjectOutput oos = new ObjectOutputStream(baos)) { try (ObjectOutput oos = new ObjectOutputStream(baos)) {
oos.writeObject(mutation); oos.writeObject(mutations);
oos.flush(); oos.flush();
return baos.toByteArray(); return baos.toByteArray();
} }
} }
private LogMutation deserLogMutation(byte[] mutation) throws IOException { private LinkedList<LogMutation> deserLogMutations(byte[] mutations) throws
IOException {
try (ObjectInput input = new ObjectInputStream( try (ObjectInput input = new ObjectInputStream(
new ByteArrayInputStream(mutation))) { new ByteArrayInputStream(mutations))) {
return (LogMutation) input.readObject(); return (LinkedList<LogMutation>) input.readObject();
} catch (ClassNotFoundException e) { } catch (ClassNotFoundException e) {
throw new IOException(e); throw new IOException(e);
} }
@ -267,7 +225,7 @@ private LogMutation deserLogMutation(byte[] mutation) throws IOException {
@Override @Override
public synchronized Configuration retrieve() { public synchronized Configuration retrieve() {
DBIterator itr = db.iterator(); DBIterator itr = db.iterator();
itr.seek(bytes(LOG_COMMITTED_TXN)); itr.seek(bytes(LOG_KEY));
Configuration config = new Configuration(false); Configuration config = new Configuration(false);
itr.next(); itr.next();
while (itr.hasNext()) { while (itr.hasNext()) {
@ -278,11 +236,6 @@ public synchronized Configuration retrieve() {
return config; return config;
} }
@Override
public List<LogMutation> getPendingMutations() {
return new LinkedList<>(pendingMutations);
}
@Override @Override
public List<LogMutation> getConfirmedConfHistory(long fromId) { public List<LogMutation> getConfirmedConfHistory(long fromId) {
return null; // unimplemented return null; // unimplemented
@ -299,6 +252,39 @@ private void startCompactionTimer() {
} }
} }
// TODO: following is taken from LeveldbRMStateStore
@Override
public Version getConfStoreVersion() throws Exception {
Version version = null;
try {
byte[] data = db.get(bytes(VERSION_KEY));
if (data != null) {
version = new VersionPBImpl(YarnServerCommonProtos.VersionProto
.parseFrom(data));
}
} catch (DBException e) {
throw new IOException(e);
}
return version;
}
@Override
public void storeVersion() throws Exception {
String key = VERSION_KEY;
byte[] data = ((VersionPBImpl) CURRENT_VERSION_INFO).getProto()
.toByteArray();
try {
db.put(bytes(key), data);
} catch (DBException e) {
throw new IOException(e);
}
}
@Override
public Version getCurrentVersion() {
return CURRENT_VERSION_INFO;
}
private class CompactionTimerTask extends TimerTask { private class CompactionTimerTask extends TimerTask {
@Override @Override
public void run() { public void run() {

View File

@ -18,20 +18,17 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ConfigurationMutationACLPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ConfigurationMutationACLPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ConfigurationMutationACLPolicyFactory; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ConfigurationMutationACLPolicyFactory;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfigurationProvider; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfigurationProvider;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation;
import org.apache.hadoop.yarn.webapp.dao.QueueConfigInfo; import org.apache.hadoop.yarn.webapp.dao.QueueConfigInfo;
@ -56,6 +53,7 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
LogFactory.getLog(MutableCSConfigurationProvider.class); LogFactory.getLog(MutableCSConfigurationProvider.class);
private Configuration schedConf; private Configuration schedConf;
private Configuration oldConf;
private YarnConfigurationStore confStore; private YarnConfigurationStore confStore;
private ConfigurationMutationACLPolicy aclMutationPolicy; private ConfigurationMutationACLPolicy aclMutationPolicy;
private RMContext rmContext; private RMContext rmContext;
@ -76,6 +74,9 @@ public void init(Configuration config) throws IOException {
case YarnConfiguration.LEVELDB_CONFIGURATION_STORE: case YarnConfiguration.LEVELDB_CONFIGURATION_STORE:
this.confStore = new LeveldbConfigurationStore(); this.confStore = new LeveldbConfigurationStore();
break; break;
case YarnConfiguration.ZK_CONFIGURATION_STORE:
this.confStore = new ZKConfigurationStore();
break;
default: default:
this.confStore = YarnConfigurationStoreFactory.getStore(config); this.confStore = YarnConfigurationStoreFactory.getStore(config);
break; break;
@ -89,7 +90,11 @@ public void init(Configuration config) throws IOException {
for (Map.Entry<String, String> kv : initialSchedConf) { for (Map.Entry<String, String> kv : initialSchedConf) {
schedConf.set(kv.getKey(), kv.getValue()); schedConf.set(kv.getKey(), kv.getValue());
} }
confStore.initialize(config, schedConf); try {
confStore.initialize(config, schedConf, rmContext);
} catch (Exception e) {
throw new IOException(e);
}
// After initializing confStore, the store may already have an existing // After initializing confStore, the store may already have an existing
// configuration. Use this one. // configuration. Use this one.
schedConf = confStore.retrieve(); schedConf = confStore.retrieve();
@ -98,6 +103,11 @@ public void init(Configuration config) throws IOException {
aclMutationPolicy.init(config, rmContext); aclMutationPolicy.init(config, rmContext);
} }
@VisibleForTesting
public YarnConfigurationStore getConfStore() {
return confStore;
}
@Override @Override
public CapacitySchedulerConfiguration loadConfiguration(Configuration public CapacitySchedulerConfiguration loadConfiguration(Configuration
configuration) throws IOException { configuration) throws IOException {
@ -107,16 +117,17 @@ public CapacitySchedulerConfiguration loadConfiguration(Configuration
} }
@Override @Override
public synchronized void mutateConfiguration(UserGroupInformation user, public ConfigurationMutationACLPolicy getAclMutationPolicy() {
SchedConfUpdateInfo confUpdate) throws IOException, YarnException { return aclMutationPolicy;
if (!aclMutationPolicy.isMutationAllowed(user, confUpdate)) { }
throw new AccessControlException("User is not admin of all modified" +
" queues."); @Override
} public void logAndApplyMutation(UserGroupInformation user,
Configuration oldConf = new Configuration(schedConf); SchedConfUpdateInfo confUpdate) throws Exception {
oldConf = new Configuration(schedConf);
Map<String, String> kvUpdate = constructKeyValueConfUpdate(confUpdate); Map<String, String> kvUpdate = constructKeyValueConfUpdate(confUpdate);
LogMutation log = new LogMutation(kvUpdate, user.getShortUserName()); LogMutation log = new LogMutation(kvUpdate, user.getShortUserName());
long id = confStore.logMutation(log); confStore.logMutation(log);
for (Map.Entry<String, String> kv : kvUpdate.entrySet()) { for (Map.Entry<String, String> kv : kvUpdate.entrySet()) {
if (kv.getValue() == null) { if (kv.getValue() == null) {
schedConf.unset(kv.getKey()); schedConf.unset(kv.getKey());
@ -124,47 +135,33 @@ public synchronized void mutateConfiguration(UserGroupInformation user,
schedConf.set(kv.getKey(), kv.getValue()); schedConf.set(kv.getKey(), kv.getValue());
} }
} }
try {
rmContext.getRMAdminService().refreshQueues();
} catch (IOException | YarnException e) {
schedConf = oldConf;
confStore.confirmMutation(id, false);
throw e;
}
confStore.confirmMutation(id, true);
} }
@Override @Override
public void recoverConf() throws IOException { public void confirmPendingMutation(boolean isValid) throws Exception {
List<LogMutation> uncommittedLogs = confStore.getPendingMutations(); confStore.confirmMutation(isValid);
Configuration oldConf = new Configuration(schedConf); if (!isValid) {
for (LogMutation mutation : uncommittedLogs) { schedConf = oldConf;
for (Map.Entry<String, String> kv : mutation.getUpdates().entrySet()) {
if (kv.getValue() == null) {
schedConf.unset(kv.getKey());
} else {
schedConf.set(kv.getKey(), kv.getValue());
}
}
try {
rmContext.getScheduler().reinitialize(schedConf, rmContext);
} catch (IOException e) {
schedConf = oldConf;
confStore.confirmMutation(mutation.getId(), false);
LOG.info("Configuration mutation " + mutation.getId()
+ " was rejected", e);
continue;
}
confStore.confirmMutation(mutation.getId(), true);
LOG.info("Configuration mutation " + mutation.getId()+ " was accepted");
} }
} }
@Override
public void reloadConfigurationFromStore() throws Exception {
schedConf = confStore.retrieve();
}
private List<String> getSiblingQueues(String queuePath, Configuration conf) {
String parentQueue = queuePath.substring(0, queuePath.lastIndexOf('.'));
String childQueuesKey = CapacitySchedulerConfiguration.PREFIX +
parentQueue + CapacitySchedulerConfiguration.DOT +
CapacitySchedulerConfiguration.QUEUES;
return new ArrayList<>(conf.getStringCollection(childQueuesKey));
}
private Map<String, String> constructKeyValueConfUpdate( private Map<String, String> constructKeyValueConfUpdate(
SchedConfUpdateInfo mutationInfo) throws IOException { SchedConfUpdateInfo mutationInfo) throws IOException {
CapacityScheduler cs = (CapacityScheduler) rmContext.getScheduler();
CapacitySchedulerConfiguration proposedConf = CapacitySchedulerConfiguration proposedConf =
new CapacitySchedulerConfiguration(cs.getConfiguration(), false); new CapacitySchedulerConfiguration(schedConf, false);
Map<String, String> confUpdate = new HashMap<>(); Map<String, String> confUpdate = new HashMap<>();
for (String queueToRemove : mutationInfo.getRemoveQueueInfo()) { for (String queueToRemove : mutationInfo.getRemoveQueueInfo()) {
removeQueue(queueToRemove, proposedConf, confUpdate); removeQueue(queueToRemove, proposedConf, confUpdate);
@ -188,40 +185,35 @@ private void removeQueue(
if (queueToRemove == null) { if (queueToRemove == null) {
return; return;
} else { } else {
CapacityScheduler cs = (CapacityScheduler) rmContext.getScheduler();
String queueName = queueToRemove.substring( String queueName = queueToRemove.substring(
queueToRemove.lastIndexOf('.') + 1); queueToRemove.lastIndexOf('.') + 1);
CSQueue queue = cs.getQueue(queueName); if (queueToRemove.lastIndexOf('.') == -1) {
if (queue == null ||
!queue.getQueuePath().equals(queueToRemove)) {
throw new IOException("Queue " + queueToRemove + " not found");
} else if (queueToRemove.lastIndexOf('.') == -1) {
throw new IOException("Can't remove queue " + queueToRemove); throw new IOException("Can't remove queue " + queueToRemove);
}
String parentQueuePath = queueToRemove.substring(0, queueToRemove
.lastIndexOf('.'));
String[] siblingQueues = proposedConf.getQueues(parentQueuePath);
List<String> newSiblingQueues = new ArrayList<>();
for (String siblingQueue : siblingQueues) {
if (!siblingQueue.equals(queueName)) {
newSiblingQueues.add(siblingQueue);
}
}
proposedConf.setQueues(parentQueuePath, newSiblingQueues
.toArray(new String[0]));
String queuesConfig = CapacitySchedulerConfiguration.PREFIX
+ parentQueuePath + CapacitySchedulerConfiguration.DOT
+ CapacitySchedulerConfiguration.QUEUES;
if (newSiblingQueues.size() == 0) {
confUpdate.put(queuesConfig, null);
} else { } else {
confUpdate.put(queuesConfig, Joiner.on(',').join(newSiblingQueues)); List<String> siblingQueues = getSiblingQueues(queueToRemove,
} proposedConf);
for (Map.Entry<String, String> confRemove : proposedConf.getValByRegex( if (!siblingQueues.contains(queueName)) {
".*" + queueToRemove.replaceAll("\\.", "\\.") + "\\..*") throw new IOException("Queue " + queueToRemove + " not found");
.entrySet()) { }
proposedConf.unset(confRemove.getKey()); siblingQueues.remove(queueName);
confUpdate.put(confRemove.getKey(), null); String parentQueuePath = queueToRemove.substring(0, queueToRemove
.lastIndexOf('.'));
proposedConf.setQueues(parentQueuePath, siblingQueues.toArray(
new String[0]));
String queuesConfig = CapacitySchedulerConfiguration.PREFIX
+ parentQueuePath + CapacitySchedulerConfiguration.DOT
+ CapacitySchedulerConfiguration.QUEUES;
if (siblingQueues.size() == 0) {
confUpdate.put(queuesConfig, null);
} else {
confUpdate.put(queuesConfig, Joiner.on(',').join(siblingQueues));
}
for (Map.Entry<String, String> confRemove : proposedConf.getValByRegex(
".*" + queueToRemove.replaceAll("\\.", "\\.") + "\\..*")
.entrySet()) {
proposedConf.unset(confRemove.getKey());
confUpdate.put(confRemove.getKey(), null);
}
} }
} }
} }
@ -232,13 +224,13 @@ private void addQueue(
if (addInfo == null) { if (addInfo == null) {
return; return;
} else { } else {
CapacityScheduler cs = (CapacityScheduler) rmContext.getScheduler();
String queuePath = addInfo.getQueue(); String queuePath = addInfo.getQueue();
String queueName = queuePath.substring(queuePath.lastIndexOf('.') + 1); String queueName = queuePath.substring(queuePath.lastIndexOf('.') + 1);
if (cs.getQueue(queueName) != null) { if (queuePath.lastIndexOf('.') == -1) {
throw new IOException("Can't add existing queue " + queuePath);
} else if (queuePath.lastIndexOf('.') == -1) {
throw new IOException("Can't add invalid queue " + queuePath); throw new IOException("Can't add invalid queue " + queuePath);
} else if (getSiblingQueues(queuePath, proposedConf).contains(
queueName)) {
throw new IOException("Can't add existing queue " + queuePath);
} }
String parentQueue = queuePath.substring(0, queuePath.lastIndexOf('.')); String parentQueue = queuePath.substring(0, queuePath.lastIndexOf('.'));
String[] siblings = proposedConf.getQueues(parentQueue); String[] siblings = proposedConf.getQueues(parentQueue);

View File

@ -18,7 +18,12 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateVersionIncompatibleException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import java.io.IOException; import java.io.IOException;
@ -39,36 +44,26 @@
* {@code getPendingMutations}, and replay/confirm them via * {@code getPendingMutations}, and replay/confirm them via
* {@code confirmMutation} as in the normal case. * {@code confirmMutation} as in the normal case.
*/ */
public interface YarnConfigurationStore { public abstract class YarnConfigurationStore {
public static final Log LOG =
LogFactory.getLog(YarnConfigurationStore.class);
/** /**
* LogMutation encapsulates the fields needed for configuration mutation * LogMutation encapsulates the fields needed for configuration mutation
* audit logging and recovery. * audit logging and recovery.
*/ */
class LogMutation implements Serializable { static class LogMutation implements Serializable {
private Map<String, String> updates; private Map<String, String> updates;
private String user; private String user;
private long id;
/** /**
* Create log mutation prior to logging. * Create log mutation.
* @param updates key-value configuration updates * @param updates key-value configuration updates
* @param user user who requested configuration change * @param user user who requested configuration change
*/ */
public LogMutation(Map<String, String> updates, String user) { LogMutation(Map<String, String> updates, String user) {
this(updates, user, 0);
}
/**
* Create log mutation for recovery.
* @param updates key-value configuration updates
* @param user user who requested configuration change
* @param id transaction id of configuration change
*/
LogMutation(Map<String, String> updates, String user, long id) {
this.updates = updates; this.updates = updates;
this.user = user; this.user = user;
this.id = id;
} }
/** /**
@ -86,75 +81,92 @@ public Map<String, String> getUpdates() {
public String getUser() { public String getUser() {
return user; return user;
} }
/**
* Get transaction id of this configuration change.
* @return transaction id
*/
public long getId() {
return id;
}
/**
* Set transaction id of this configuration change.
* @param id transaction id
*/
public void setId(long id) {
this.id = id;
}
} }
/** /**
* Initialize the configuration store. * Initialize the configuration store, with schedConf as the initial
* scheduler configuration. If a persisted store already exists, use the
* scheduler configuration stored there, and ignore schedConf.
* @param conf configuration to initialize store with * @param conf configuration to initialize store with
* @param schedConf Initial key-value configuration to persist * @param schedConf Initial key-value scheduler configuration to persist.
* @param rmContext RMContext for this configuration store
* @throws IOException if initialization fails * @throws IOException if initialization fails
*/ */
void initialize(Configuration conf, Configuration schedConf) public abstract void initialize(Configuration conf, Configuration schedConf,
throws IOException; RMContext rmContext) throws Exception;
/** /**
* Logs the configuration change to backing store. Generates an id associated * Logs the configuration change to backing store.
* with this mutation, sets it in {@code logMutation}, and returns it.
* @param logMutation configuration change to be persisted in write ahead log * @param logMutation configuration change to be persisted in write ahead log
* @return id which configuration store associates with this mutation
* @throws IOException if logging fails * @throws IOException if logging fails
*/ */
long logMutation(LogMutation logMutation) throws IOException; public abstract void logMutation(LogMutation logMutation) throws Exception;
/** /**
* Should be called after {@code logMutation}. Gets the pending mutation * Should be called after {@code logMutation}. Gets the pending mutation
* associated with {@code id} and marks the mutation as persisted (no longer * last logged by {@code logMutation} and marks the mutation as persisted (no
* pending). If isValid is true, merge the mutation with the persisted * longer pending). If isValid is true, merge the mutation with the persisted
* configuration. * configuration.
* * @param isValid if true, update persisted configuration with pending
* If {@code confirmMutation} is called with ids in a different order than * mutation.
* was returned by {@code logMutation}, the result is implementation * @throws Exception if mutation confirmation fails
* dependent.
* @param id id of mutation to be confirmed
* @param isValid if true, update persisted configuration with mutation
* associated with {@code id}.
* @return true on success
* @throws IOException if mutation confirmation fails
*/ */
boolean confirmMutation(long id, boolean isValid) throws IOException; public abstract void confirmMutation(boolean isValid) throws Exception;
/** /**
* Retrieve the persisted configuration. * Retrieve the persisted configuration.
* @return configuration as key-value * @return configuration as key-value
*/ */
Configuration retrieve(); public abstract Configuration retrieve();
/**
* Get the list of pending mutations, in the order they were logged.
* @return list of mutations
*/
List<LogMutation> getPendingMutations();
/** /**
* Get a list of confirmed configuration mutations starting from a given id. * Get a list of confirmed configuration mutations starting from a given id.
* @param fromId id from which to start getting mutations, inclusive * @param fromId id from which to start getting mutations, inclusive
* @return list of configuration mutations * @return list of configuration mutations
*/ */
List<LogMutation> getConfirmedConfHistory(long fromId); public abstract List<LogMutation> getConfirmedConfHistory(long fromId);
/**
* Get schema version of persisted conf store, for detecting compatibility
* issues when changing conf store schema.
* @return Schema version currently used by the persisted configuration store.
* @throws Exception On version fetch failure
*/
protected abstract Version getConfStoreVersion() throws Exception;
/**
* Persist the hard-coded schema version to the conf store.
* @throws Exception On storage failure
*/
protected abstract void storeVersion() throws Exception;
/**
* Get the hard-coded schema version, for comparison against the schema
* version currently persisted.
* @return Current hard-coded schema version
*/
protected abstract Version getCurrentVersion();
public void checkVersion() throws Exception {
// TODO this was taken from RMStateStore. Should probably refactor
Version loadedVersion = getConfStoreVersion();
LOG.info("Loaded configuration store version info " + loadedVersion);
if (loadedVersion != null && loadedVersion.equals(getCurrentVersion())) {
return;
}
// if there is no version info, treat it as CURRENT_VERSION_INFO;
if (loadedVersion == null) {
loadedVersion = getCurrentVersion();
}
if (loadedVersion.isCompatibleTo(getCurrentVersion())) {
LOG.info("Storing configuration store version info "
+ getCurrentVersion());
storeVersion();
} else {
throw new RMStateVersionIncompatibleException(
"Expecting configuration store version " + getCurrentVersion()
+ ", but loading version " + loadedVersion);
}
}
} }

View File

@ -0,0 +1,235 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.curator.ZKCuratorManager;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.ACL;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
/**
* A Zookeeper-based implementation of {@link YarnConfigurationStore}.
*/
public class ZKConfigurationStore extends YarnConfigurationStore {
public static final Log LOG =
LogFactory.getLog(ZKConfigurationStore.class);
private long maxLogs;
@VisibleForTesting
protected static final Version CURRENT_VERSION_INFO = Version
.newInstance(0, 1);
private Configuration conf;
private LogMutation pendingMutation;
private String znodeParentPath;
private static final String ZK_VERSION_PATH = "VERSION";
private static final String LOGS_PATH = "LOGS";
private static final String CONF_STORE_PATH = "CONF_STORE";
private static final String FENCING_PATH = "FENCING";
private String zkVersionPath;
private String logsPath;
private String confStorePath;
private String fencingNodePath;
@VisibleForTesting
protected ZKCuratorManager zkManager;
private List<ACL> zkAcl;
@Override
public void initialize(Configuration config, Configuration schedConf,
RMContext rmContext) throws Exception {
this.conf = config;
this.maxLogs = conf.getLong(YarnConfiguration.RM_SCHEDCONF_MAX_LOGS,
YarnConfiguration.DEFAULT_RM_SCHEDCONF_ZK_MAX_LOGS);
this.znodeParentPath =
conf.get(YarnConfiguration.RM_SCHEDCONF_STORE_ZK_PARENT_PATH,
YarnConfiguration.DEFAULT_RM_SCHEDCONF_STORE_ZK_PARENT_PATH);
this.zkManager = rmContext.getResourceManager().getAndStartZKManager(conf);
this.zkAcl = ZKCuratorManager.getZKAcls(conf);
this.zkVersionPath = getNodePath(znodeParentPath, ZK_VERSION_PATH);
this.logsPath = getNodePath(znodeParentPath, LOGS_PATH);
this.confStorePath = getNodePath(znodeParentPath, CONF_STORE_PATH);
this.fencingNodePath = getNodePath(znodeParentPath, FENCING_PATH);
zkManager.createRootDirRecursively(znodeParentPath);
zkManager.delete(fencingNodePath);
if (!zkManager.exists(logsPath)) {
zkManager.create(logsPath);
zkManager.setData(logsPath,
serializeObject(new LinkedList<LogMutation>()), -1);
}
if (!zkManager.exists(confStorePath)) {
zkManager.create(confStorePath);
HashMap<String, String> mapSchedConf = new HashMap<>();
for (Map.Entry<String, String> entry : schedConf) {
mapSchedConf.put(entry.getKey(), entry.getValue());
}
zkManager.setData(confStorePath, serializeObject(mapSchedConf), -1);
}
}
@VisibleForTesting
protected LinkedList<LogMutation> getLogs() throws Exception {
return (LinkedList<LogMutation>)
deserializeObject(zkManager.getData(logsPath));
}
// TODO: following version-related code is taken from ZKRMStateStore
@Override
public Version getCurrentVersion() {
return CURRENT_VERSION_INFO;
}
@Override
public Version getConfStoreVersion() throws Exception {
if (zkManager.exists(zkVersionPath)) {
byte[] data = zkManager.getData(zkVersionPath);
return new VersionPBImpl(YarnServerCommonProtos.VersionProto
.parseFrom(data));
}
return null;
}
@Override
public synchronized void storeVersion() throws Exception {
byte[] data =
((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray();
if (zkManager.exists(zkVersionPath)) {
zkManager.safeSetData(zkVersionPath, data, -1, zkAcl, fencingNodePath);
} else {
zkManager.safeCreate(zkVersionPath, data, zkAcl, CreateMode.PERSISTENT,
zkAcl, fencingNodePath);
}
}
@Override
public void logMutation(LogMutation logMutation) throws Exception {
byte[] storedLogs = zkManager.getData(logsPath);
LinkedList<LogMutation> logs = new LinkedList<>();
if (storedLogs != null) {
logs = (LinkedList<LogMutation>) deserializeObject(storedLogs);
}
logs.add(logMutation);
if (logs.size() > maxLogs) {
logs.remove(logs.removeFirst());
}
zkManager.safeSetData(logsPath, serializeObject(logs), -1, zkAcl,
fencingNodePath);
pendingMutation = logMutation;
}
@Override
public void confirmMutation(boolean isValid)
throws Exception {
if (isValid) {
Configuration storedConfigs = retrieve();
Map<String, String> mapConf = new HashMap<>();
for (Map.Entry<String, String> storedConf : storedConfigs) {
mapConf.put(storedConf.getKey(), storedConf.getValue());
}
for (Map.Entry<String, String> confChange :
pendingMutation.getUpdates().entrySet()) {
if (confChange.getValue() == null || confChange.getValue().isEmpty()) {
mapConf.remove(confChange.getKey());
} else {
mapConf.put(confChange.getKey(), confChange.getValue());
}
}
zkManager.safeSetData(confStorePath, serializeObject(mapConf), -1,
zkAcl, fencingNodePath);
}
pendingMutation = null;
}
@Override
public synchronized Configuration retrieve() {
byte[] serializedSchedConf;
try {
serializedSchedConf = zkManager.getData(confStorePath);
} catch (Exception e) {
LOG.error("Failed to retrieve configuration from zookeeper store", e);
return null;
}
try {
Map<String, String> map =
(HashMap<String, String>) deserializeObject(serializedSchedConf);
Configuration c = new Configuration();
for (Map.Entry<String, String> e : map.entrySet()) {
c.set(e.getKey(), e.getValue());
}
return c;
} catch (Exception e) {
LOG.error("Exception while deserializing scheduler configuration " +
"from store", e);
}
return null;
}
@Override
public List<LogMutation> getConfirmedConfHistory(long fromId) {
return null; // unimplemented
}
private static String getNodePath(String root, String nodeName) {
return ZKCuratorManager.getNodePath(root, nodeName);
}
private static byte[] serializeObject(Object o) throws Exception {
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos);) {
oos.writeObject(o);
oos.flush();
baos.flush();
return baos.toByteArray();
}
}
private static Object deserializeObject(byte[] bytes) throws Exception {
try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
ObjectInputStream ois = new ObjectInputStream(bais);) {
return ois.readObject();
}
}
}

View File

@ -136,6 +136,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfigurationProvider;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
@ -2464,7 +2465,7 @@ protected List<ContainerReport> getContainersReport(
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8, @Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 }) MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
@Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) @Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
public Response updateSchedulerConfiguration(SchedConfUpdateInfo public synchronized Response updateSchedulerConfiguration(SchedConfUpdateInfo
mutationInfo, @Context HttpServletRequest hsr) mutationInfo, @Context HttpServletRequest hsr)
throws AuthorizationException, InterruptedException { throws AuthorizationException, InterruptedException {
init(); init();
@ -2479,17 +2480,32 @@ public Response updateSchedulerConfiguration(SchedConfUpdateInfo
} }
ResourceScheduler scheduler = rm.getResourceScheduler(); ResourceScheduler scheduler = rm.getResourceScheduler();
if (scheduler instanceof MutableConfScheduler) { if (scheduler instanceof MutableConfScheduler && ((MutableConfScheduler)
scheduler).isConfigurationMutable()) {
try { try {
callerUGI.doAs(new PrivilegedExceptionAction<Void>() { callerUGI.doAs(new PrivilegedExceptionAction<Void>() {
@Override @Override
public Void run() throws IOException, YarnException { public Void run() throws Exception {
((MutableConfScheduler) scheduler).updateConfiguration(callerUGI, MutableConfigurationProvider provider = ((MutableConfScheduler)
mutationInfo); scheduler).getMutableConfProvider();
if (!provider.getAclMutationPolicy().isMutationAllowed(callerUGI,
mutationInfo)) {
throw new org.apache.hadoop.security.AccessControlException("User"
+ " is not admin of all modified queues.");
}
provider.logAndApplyMutation(callerUGI, mutationInfo);
try {
rm.getRMContext().getRMAdminService().refreshQueues();
} catch (IOException | YarnException e) {
provider.confirmPendingMutation(false);
throw e;
}
provider.confirmPendingMutation(true);
return null; return null;
} }
}); });
} catch (IOException e) { } catch (IOException e) {
LOG.error("Exception thrown when modifying configuration.", e);
return Response.status(Status.BAD_REQUEST).entity(e.getMessage()) return Response.status(Status.BAD_REQUEST).entity(e.getMessage())
.build(); .build();
} }

View File

@ -0,0 +1,90 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
/**
* Base class for {@link YarnConfigurationStore} implementations.
*/
public abstract class ConfigurationStoreBaseTest {
protected YarnConfigurationStore confStore = createConfStore();
protected abstract YarnConfigurationStore createConfStore();
protected Configuration conf;
protected Configuration schedConf;
protected RMContext rmContext;
protected static final String TEST_USER = "testUser";
@Before
public void setUp() throws Exception {
this.conf = new Configuration();
this.schedConf = new Configuration(false);
}
@Test
public void testConfigurationUpdate() throws Exception {
schedConf.set("key1", "val1");
confStore.initialize(conf, schedConf, rmContext);
assertEquals("val1", confStore.retrieve().get("key1"));
Map<String, String> update1 = new HashMap<>();
update1.put("keyUpdate1", "valUpdate1");
YarnConfigurationStore.LogMutation mutation1 =
new YarnConfigurationStore.LogMutation(update1, TEST_USER);
confStore.logMutation(mutation1);
confStore.confirmMutation(true);
assertEquals("valUpdate1", confStore.retrieve().get("keyUpdate1"));
Map<String, String> update2 = new HashMap<>();
update2.put("keyUpdate2", "valUpdate2");
YarnConfigurationStore.LogMutation mutation2 =
new YarnConfigurationStore.LogMutation(update2, TEST_USER);
confStore.logMutation(mutation2);
confStore.confirmMutation(false);
assertNull("Configuration should not be updated",
confStore.retrieve().get("keyUpdate2"));
}
@Test
public void testNullConfigurationUpdate() throws Exception {
schedConf.set("key", "val");
confStore.initialize(conf, schedConf, rmContext);
assertEquals("val", confStore.retrieve().get("key"));
Map<String, String> update = new HashMap<>();
update.put("key", null);
YarnConfigurationStore.LogMutation mutation =
new YarnConfigurationStore.LogMutation(update, TEST_USER);
confStore.logMutation(mutation);
confStore.confirmMutation(true);
assertNull(confStore.retrieve().get("key"));
}
}

View File

@ -0,0 +1,30 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
/**
* Tests {@link InMemoryConfigurationStore}.
*/
public class TestInMemoryConfigurationStore extends ConfigurationStoreBaseTest {
@Override
protected YarnConfigurationStore createConfStore() {
return new InMemoryConfigurationStore();
}
}

View File

@ -20,7 +20,6 @@
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.AdminService; import org.apache.hadoop.yarn.server.resourcemanager.AdminService;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
@ -30,14 +29,11 @@
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import java.io.IOException;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@ -82,25 +78,21 @@ public void setUp() {
} }
@Test @Test
public void testInMemoryBackedProvider() throws IOException, YarnException { public void testInMemoryBackedProvider() throws Exception {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
confProvider.init(conf); confProvider.init(conf);
assertNull(confProvider.loadConfiguration(conf) assertNull(confProvider.loadConfiguration(conf)
.get("yarn.scheduler.capacity.root.a.goodKey")); .get("yarn.scheduler.capacity.root.a.goodKey"));
doNothing().when(adminService).refreshQueues(); confProvider.logAndApplyMutation(TEST_USER, goodUpdate);
confProvider.mutateConfiguration(TEST_USER, goodUpdate); confProvider.confirmPendingMutation(true);
assertEquals("goodVal", confProvider.loadConfiguration(conf) assertEquals("goodVal", confProvider.loadConfiguration(conf)
.get("yarn.scheduler.capacity.root.a.goodKey")); .get("yarn.scheduler.capacity.root.a.goodKey"));
assertNull(confProvider.loadConfiguration(conf).get( assertNull(confProvider.loadConfiguration(conf).get(
"yarn.scheduler.capacity.root.a.badKey")); "yarn.scheduler.capacity.root.a.badKey"));
doThrow(new IOException()).when(adminService).refreshQueues(); confProvider.logAndApplyMutation(TEST_USER, badUpdate);
try { confProvider.confirmPendingMutation(false);
confProvider.mutateConfiguration(TEST_USER, badUpdate);
} catch (IOException e) {
// Expected exception.
}
assertNull(confProvider.loadConfiguration(conf).get( assertNull(confProvider.loadConfiguration(conf).get(
"yarn.scheduler.capacity.root.a.badKey")); "yarn.scheduler.capacity.root.a.badKey"));
} }

View File

@ -1,71 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
public class TestYarnConfigurationStore {
private YarnConfigurationStore confStore;
private Configuration schedConf;
private static final String testUser = "testUser";
@Before
public void setUp() {
schedConf = new Configuration(false);
schedConf.set("key1", "val1");
}
@Test
public void testInMemoryConfigurationStore() throws IOException {
confStore = new InMemoryConfigurationStore();
confStore.initialize(new Configuration(), schedConf);
assertEquals("val1", confStore.retrieve().get("key1"));
Map<String, String> update1 = new HashMap<>();
update1.put("keyUpdate1", "valUpdate1");
LogMutation mutation1 = new LogMutation(update1, testUser);
long id = confStore.logMutation(mutation1);
assertEquals(1, confStore.getPendingMutations().size());
confStore.confirmMutation(id, true);
assertEquals("valUpdate1", confStore.retrieve().get("keyUpdate1"));
assertEquals(0, confStore.getPendingMutations().size());
Map<String, String> update2 = new HashMap<>();
update2.put("keyUpdate2", "valUpdate2");
LogMutation mutation2 = new LogMutation(update2, testUser);
id = confStore.logMutation(mutation2);
assertEquals(1, confStore.getPendingMutations().size());
confStore.confirmMutation(id, false);
assertNull("Configuration should not be updated",
confStore.retrieve().get("keyUpdate2"));
assertEquals(0, confStore.getPendingMutations().size());
}
}

View File

@ -0,0 +1,312 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.apache.curator.test.TestingServer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfigurationProvider;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
/**
* Tests {@link ZKConfigurationStore}.
*/
public class TestZKConfigurationStore extends ConfigurationStoreBaseTest {
public static final Log LOG =
LogFactory.getLog(TestZKConfigurationStore.class);
private static final int ZK_TIMEOUT_MS = 10000;
private TestingServer curatorTestingServer;
private CuratorFramework curatorFramework;
private ResourceManager rm;
public static TestingServer setupCuratorServer() throws Exception {
TestingServer curatorTestingServer = new TestingServer();
curatorTestingServer.start();
return curatorTestingServer;
}
public static CuratorFramework setupCuratorFramework(
TestingServer curatorTestingServer) throws Exception {
CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
.connectString(curatorTestingServer.getConnectString())
.retryPolicy(new RetryNTimes(100, 100))
.build();
curatorFramework.start();
return curatorFramework;
}
@Before
public void setUp() throws Exception {
super.setUp();
curatorTestingServer = setupCuratorServer();
curatorFramework = setupCuratorFramework(curatorTestingServer);
conf.set(CommonConfigurationKeys.ZK_ADDRESS,
curatorTestingServer.getConnectString());
rm = new MockRM(conf);
rm.start();
rmContext = rm.getRMContext();
}
@After
public void cleanup() throws IOException {
rm.stop();
curatorFramework.close();
curatorTestingServer.stop();
}
@Test
public void testVersioning() throws Exception {
confStore.initialize(conf, schedConf, rmContext);
assertNull(confStore.getConfStoreVersion());
confStore.checkVersion();
assertEquals(ZKConfigurationStore.CURRENT_VERSION_INFO,
confStore.getConfStoreVersion());
}
@Test
public void testPersistConfiguration() throws Exception {
schedConf.set("key", "val");
confStore.initialize(conf, schedConf, rmContext);
assertEquals("val", confStore.retrieve().get("key"));
// Create a new configuration store, and check for old configuration
confStore = createConfStore();
schedConf.set("key", "badVal");
// Should ignore passed-in scheduler configuration.
confStore.initialize(conf, schedConf, rmContext);
assertEquals("val", confStore.retrieve().get("key"));
}
@Test
public void testPersistUpdatedConfiguration() throws Exception {
confStore.initialize(conf, schedConf, rmContext);
assertNull(confStore.retrieve().get("key"));
Map<String, String> update = new HashMap<>();
update.put("key", "val");
YarnConfigurationStore.LogMutation mutation =
new YarnConfigurationStore.LogMutation(update, TEST_USER);
confStore.logMutation(mutation);
confStore.confirmMutation(true);
assertEquals("val", confStore.retrieve().get("key"));
// Create a new configuration store, and check for updated configuration
confStore = createConfStore();
schedConf.set("key", "badVal");
// Should ignore passed-in scheduler configuration.
confStore.initialize(conf, schedConf, rmContext);
assertEquals("val", confStore.retrieve().get("key"));
}
@Test
public void testMaxLogs() throws Exception {
conf.setLong(YarnConfiguration.RM_SCHEDCONF_MAX_LOGS, 2);
confStore.initialize(conf, schedConf, rmContext);
LinkedList<YarnConfigurationStore.LogMutation> logs =
((ZKConfigurationStore) confStore).getLogs();
assertEquals(0, logs.size());
Map<String, String> update1 = new HashMap<>();
update1.put("key1", "val1");
YarnConfigurationStore.LogMutation mutation =
new YarnConfigurationStore.LogMutation(update1, TEST_USER);
confStore.logMutation(mutation);
logs = ((ZKConfigurationStore) confStore).getLogs();
assertEquals(1, logs.size());
assertEquals("val1", logs.get(0).getUpdates().get("key1"));
confStore.confirmMutation(true);
assertEquals(1, logs.size());
assertEquals("val1", logs.get(0).getUpdates().get("key1"));
Map<String, String> update2 = new HashMap<>();
update2.put("key2", "val2");
mutation = new YarnConfigurationStore.LogMutation(update2, TEST_USER);
confStore.logMutation(mutation);
logs = ((ZKConfigurationStore) confStore).getLogs();
assertEquals(2, logs.size());
assertEquals("val1", logs.get(0).getUpdates().get("key1"));
assertEquals("val2", logs.get(1).getUpdates().get("key2"));
confStore.confirmMutation(true);
assertEquals(2, logs.size());
assertEquals("val1", logs.get(0).getUpdates().get("key1"));
assertEquals("val2", logs.get(1).getUpdates().get("key2"));
// Next update should purge first update from logs.
Map<String, String> update3 = new HashMap<>();
update3.put("key3", "val3");
mutation = new YarnConfigurationStore.LogMutation(update3, TEST_USER);
confStore.logMutation(mutation);
logs = ((ZKConfigurationStore) confStore).getLogs();
assertEquals(2, logs.size());
assertEquals("val2", logs.get(0).getUpdates().get("key2"));
assertEquals("val3", logs.get(1).getUpdates().get("key3"));
confStore.confirmMutation(true);
assertEquals(2, logs.size());
assertEquals("val2", logs.get(0).getUpdates().get("key2"));
assertEquals("val3", logs.get(1).getUpdates().get("key3"));
}
public Configuration createRMHAConf(String rmIds, String rmId,
int adminPort) {
Configuration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
conf.set(YarnConfiguration.RM_HA_IDS, rmIds);
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
conf.set(CapacitySchedulerConfiguration.CS_CONF_PROVIDER,
CapacitySchedulerConfiguration.STORE_CS_CONF_PROVIDER);
conf.set(YarnConfiguration.SCHEDULER_CONFIGURATION_STORE_CLASS,
YarnConfiguration.ZK_CONFIGURATION_STORE);
conf.set(YarnConfiguration.RM_STORE, ZKRMStateStore.class.getName());
conf.set(YarnConfiguration.RM_ZK_ADDRESS,
curatorTestingServer.getConnectString());
conf.set(YarnConfiguration.RM_HA_ID, rmId);
conf.set(YarnConfiguration.RM_WEBAPP_ADDRESS, "localhost:0");
conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
for (String rpcAddress :
YarnConfiguration.getServiceAddressConfKeys(conf)) {
for (String id : HAUtil.getRMHAIds(conf)) {
conf.set(HAUtil.addSuffix(rpcAddress, id), "localhost:0");
}
}
conf.set(HAUtil.addSuffix(YarnConfiguration.RM_ADMIN_ADDRESS, rmId),
"localhost:" + adminPort);
return conf;
}
/**
* When failing over, new active RM should read from current state of store,
* including any updates when the new active RM was in standby.
* @throws Exception
*/
@Test
public void testFailoverReadsFromUpdatedStore() throws Exception {
HAServiceProtocol.StateChangeRequestInfo req =
new HAServiceProtocol.StateChangeRequestInfo(
HAServiceProtocol.RequestSource.REQUEST_BY_USER);
Configuration conf1 = createRMHAConf("rm1,rm2", "rm1", 1234);
ResourceManager rm1 = new MockRM(conf1);
rm1.start();
rm1.getRMContext().getRMAdminService().transitionToActive(req);
assertEquals("RM with ZKStore didn't start",
Service.STATE.STARTED, rm1.getServiceState());
assertEquals("RM should be Active",
HAServiceProtocol.HAServiceState.ACTIVE,
rm1.getRMContext().getRMAdminService().getServiceStatus().getState());
assertNull(((MutableConfScheduler) rm1.getResourceScheduler())
.getConfiguration().get("key"));
Configuration conf2 = createRMHAConf("rm1,rm2", "rm2", 5678);
ResourceManager rm2 = new MockRM(conf2);
rm2.start();
assertEquals("RM should be Standby",
HAServiceProtocol.HAServiceState.STANDBY,
rm2.getRMContext().getRMAdminService().getServiceStatus().getState());
// Update configuration on RM1
SchedConfUpdateInfo schedConfUpdateInfo = new SchedConfUpdateInfo();
schedConfUpdateInfo.getGlobalParams().put("key", "val");
MutableConfigurationProvider confProvider = ((MutableConfScheduler)
rm1.getResourceScheduler()).getMutableConfProvider();
UserGroupInformation user = UserGroupInformation
.createUserForTesting(TEST_USER, new String[0]);
confProvider.logAndApplyMutation(user, schedConfUpdateInfo);
rm1.getResourceScheduler().reinitialize(conf1, rm1.getRMContext());
assertEquals("val", ((MutableConfScheduler) rm1.getResourceScheduler())
.getConfiguration().get("key"));
confProvider.confirmPendingMutation(true);
assertEquals("val", ((MutableCSConfigurationProvider) confProvider)
.getConfStore().retrieve().get("key"));
// Next update is not persisted, it should not be recovered
schedConfUpdateInfo.getGlobalParams().put("key", "badVal");
confProvider.logAndApplyMutation(user, schedConfUpdateInfo);
// Start RM2 and verifies it starts with updated configuration
rm2.getRMContext().getRMAdminService().transitionToActive(req);
assertEquals("RM with ZKStore didn't start",
Service.STATE.STARTED, rm2.getServiceState());
assertEquals("RM should be Active",
HAServiceProtocol.HAServiceState.ACTIVE,
rm2.getRMContext().getRMAdminService().getServiceStatus().getState());
for (int i = 0; i < ZK_TIMEOUT_MS / 50; i++) {
if (HAServiceProtocol.HAServiceState.ACTIVE ==
rm1.getRMContext().getRMAdminService().getServiceStatus()
.getState()) {
Thread.sleep(100);
}
}
assertEquals("RM should have been fenced",
HAServiceProtocol.HAServiceState.STANDBY,
rm1.getRMContext().getRMAdminService().getServiceStatus().getState());
assertEquals("RM should be Active",
HAServiceProtocol.HAServiceState.ACTIVE,
rm2.getRMContext().getRMAdminService().getServiceStatus().getState());
assertEquals("val", ((MutableCSConfigurationProvider) (
(CapacityScheduler) rm2.getResourceScheduler())
.getMutableConfProvider()).getConfStore().retrieve().get("key"));
assertEquals("val", ((MutableConfScheduler) rm2.getResourceScheduler())
.getConfiguration().get("key"));
// Transition to standby will set RM's HA status and then reinitialize in
// a separate thread. Despite asserting for STANDBY state, it's
// possible for reinitialization to be unfinished. Wait here for it to
// finish, otherwise closing rm1 will close zkManager and the unfinished
// reinitialization will throw an exception.
Thread.sleep(10000);
rm1.close();
rm2.close();
}
@Override
public YarnConfigurationStore createConfStore() {
return new ZKConfigurationStore();
}
}