diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestSchedConfCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestSchedConfCLI.java index 4233b4c386..3b961df61d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestSchedConfCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestSchedConfCLI.java @@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfigurationProvider; import org.apache.hadoop.yarn.server.resourcemanager.webapp.JAXBContextResolver; @@ -247,10 +248,10 @@ public void testFormatSchedulerConf() throws Exception { globalUpdates.put("schedKey1", "schedVal1"); schedUpdateInfo.setGlobalParams(globalUpdates); - provider.logAndApplyMutation(UserGroupInformation.getCurrentUser(), - schedUpdateInfo); + LogMutation log = provider.logAndApplyMutation( + UserGroupInformation.getCurrentUser(), schedUpdateInfo); rm.getRMContext().getRMAdminService().refreshQueues(); - provider.confirmPendingMutation(true); + provider.confirmPendingMutation(log, true); Configuration schedulerConf = provider.getConfiguration(); assertEquals("schedVal1", schedulerConf.get("schedKey1")); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java index 03902e384d..751c9a38e3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java @@ -20,6 +20,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation; import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo; import java.io.IOException; @@ -46,18 +47,21 @@ public interface MutableConfigurationProvider { * Log user's requested configuration mutation, and applies it in-memory. * @param user User who requested the change * @param confUpdate User's requested configuration change + * @return LogMutation with update info from given SchedConfUpdateInfo * @throws Exception if logging the mutation fails */ - void logAndApplyMutation(UserGroupInformation user, SchedConfUpdateInfo - confUpdate) throws Exception; + LogMutation logAndApplyMutation(UserGroupInformation user, + SchedConfUpdateInfo confUpdate) throws Exception; /** * Confirm last logged mutation. + * @param pendingMutation the log mutation to apply * @param isValid if the last logged mutation is applied to scheduler * properly. * @throws Exception if confirming mutation fails */ - void confirmPendingMutation(boolean isValid) throws Exception; + void confirmPendingMutation(LogMutation pendingMutation, + boolean isValid) throws Exception; /** * Returns scheduler configuration cached in this provider. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/FSSchedulerConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/FSSchedulerConfigurationStore.java index 855939e057..eeb38d38db 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/FSSchedulerConfigurationStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/FSSchedulerConfigurationStore.java @@ -58,7 +58,6 @@ public class FSSchedulerConfigurationStore extends YarnConfigurationStore { private int maxVersion; private Path schedulerConfDir; private FileSystem fileSystem; - private LogMutation pendingMutation; private PathFilter configFilePathFilter; private volatile Configuration schedConf; private volatile Configuration oldConf; @@ -134,10 +133,9 @@ public boolean accept(Path path) { */ @Override public void logMutation(LogMutation logMutation) throws IOException { - pendingMutation = logMutation; LOG.info(new GsonBuilder().serializeNulls().create().toJson(logMutation)); oldConf = new Configuration(schedConf); - Map mutations = pendingMutation.getUpdates(); + Map mutations = logMutation.getUpdates(); for (Map.Entry kv : mutations.entrySet()) { if (kv.getValue() == null) { this.schedConf.unset(kv.getKey()); @@ -149,12 +147,14 @@ public void logMutation(LogMutation logMutation) throws IOException { } /** + * @param pendingMutation the log mutation to apply * @param isValid if true, finalize temp configuration file * if false, remove temp configuration file and rollback * @throws Exception throw IOE when write temp configuration file fail */ @Override - public void confirmMutation(boolean isValid) throws Exception { + public void confirmMutation(LogMutation pendingMutation, + boolean isValid) throws Exception { if (pendingMutation == null || tempConfigPath == null) { LOG.warn("pendingMutation or tempConfigPath is null, do nothing"); return; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java index 47dd6bdfe6..d031ea984f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java @@ -32,7 +32,6 @@ public class InMemoryConfigurationStore extends YarnConfigurationStore { private Configuration schedConf; - private LogMutation pendingMutation; private long configVersion; @Override @@ -42,13 +41,17 @@ public void initialize(Configuration conf, Configuration schedConf, this.configVersion = 1L; } + /** + * This method does not log as it does not support backing store. + * The mutation to be applied on top of schedConf will be directly passed + * in confirmMutation. + */ @Override public void logMutation(LogMutation logMutation) { - pendingMutation = logMutation; } @Override - public void confirmMutation(boolean isValid) { + public void confirmMutation(LogMutation pendingMutation, boolean isValid) { if (isValid) { for (Map.Entry kv : pendingMutation.getUpdates() .entrySet()) { @@ -60,7 +63,6 @@ public void confirmMutation(boolean isValid) { } this.configVersion = this.configVersion + 1L; } - pendingMutation = null; } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java index 39cd8ff9f2..bcdfb5924d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java @@ -75,7 +75,6 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore { private DB versiondb; private long maxLogs; private Configuration conf; - private LogMutation pendingMutation; @VisibleForTesting protected static final Version CURRENT_VERSION_INFO = Version .newInstance(0, 1); @@ -232,11 +231,11 @@ public void logMutation(LogMutation logMutation) throws IOException { } db.put(bytes(LOG_KEY), serLogMutations(logs)); } - pendingMutation = logMutation; } @Override - public void confirmMutation(boolean isValid) throws IOException { + public void confirmMutation(LogMutation pendingMutation, + boolean isValid) throws IOException { WriteBatch updateBatch = db.createWriteBatch(); if (isValid) { for (Map.Entry changes : @@ -252,7 +251,6 @@ public void confirmMutation(boolean isValid) throws IOException { bytes(String.valueOf(configVersion))); } db.write(updateBatch); - pendingMutation = null; } private byte[] serLogMutations(LinkedList mutations) throws diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java index c8d0a0c6ea..09146408fb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java @@ -128,7 +128,7 @@ public ConfigurationMutationACLPolicy getAclMutationPolicy() { } @Override - public void logAndApplyMutation(UserGroupInformation user, + public LogMutation logAndApplyMutation(UserGroupInformation user, SchedConfUpdateInfo confUpdate) throws Exception { oldConf = new Configuration(schedConf); Map kvUpdate = constructKeyValueConfUpdate(confUpdate); @@ -141,6 +141,7 @@ public void logAndApplyMutation(UserGroupInformation user, schedConf.set(kv.getKey(), kv.getValue()); } } + return log; } @Override @@ -184,10 +185,11 @@ public void revertToOldConfig(Configuration config) throws Exception { } @Override - public void confirmPendingMutation(boolean isValid) throws Exception { + public void confirmPendingMutation(LogMutation pendingMutation, + boolean isValid) throws Exception { formatLock.readLock().lock(); try { - confStore.confirmMutation(isValid); + confStore.confirmMutation(pendingMutation, isValid); if (!isValid) { schedConf = oldConf; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java index 6af11a31d6..34aa17428c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java @@ -52,7 +52,7 @@ public abstract class YarnConfigurationStore { * LogMutation encapsulates the fields needed for configuration mutation * audit logging and recovery. */ - static class LogMutation implements Serializable { + public static class LogMutation implements Serializable { private Map updates; private String user; @@ -113,11 +113,13 @@ public void close() throws IOException {} * last logged by {@code logMutation} and marks the mutation as persisted (no * longer pending). If isValid is true, merge the mutation with the persisted * configuration. + * @param pendingMutation the log mutation to apply * @param isValid if true, update persisted configuration with pending * mutation. * @throws Exception if mutation confirmation fails */ - public abstract void confirmMutation(boolean isValid) throws Exception; + public abstract void confirmMutation(LogMutation pendingMutation, + boolean isValid) throws Exception; /** * Retrieve the persisted configuration. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ZKConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ZKConfigurationStore.java index 75ae727dff..0fa48b4939 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ZKConfigurationStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ZKConfigurationStore.java @@ -54,7 +54,6 @@ public class ZKConfigurationStore extends YarnConfigurationStore { protected static final Version CURRENT_VERSION_INFO = Version .newInstance(0, 1); private Configuration conf; - private LogMutation pendingMutation; private String znodeParentPath; @@ -175,12 +174,11 @@ public void logMutation(LogMutation logMutation) throws Exception { zkManager.safeSetData(logsPath, serializeObject(logs), -1, zkAcl, fencingNodePath); } - pendingMutation = logMutation; } @Override - public void confirmMutation(boolean isValid) - throws Exception { + public void confirmMutation(LogMutation pendingMutation, + boolean isValid) throws Exception { if (isValid) { Configuration storedConfigs = retrieve(); Map mapConf = new HashMap<>(); @@ -201,7 +199,6 @@ public void confirmMutation(boolean isValid) zkManager.setData(confVersionPath, String.valueOf(configVersion), -1); } - pendingMutation = null; } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java index e63dc068d1..bdd8e6456a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java @@ -148,6 +148,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; @@ -2643,14 +2644,15 @@ public Void run() throws Exception { throw new org.apache.hadoop.security.AccessControlException("User" + " is not admin of all modified queues."); } - provider.logAndApplyMutation(callerUGI, mutationInfo); + LogMutation logMutation = provider.logAndApplyMutation(callerUGI, + mutationInfo); try { rm.getRMContext().getRMAdminService().refreshQueues(); } catch (IOException | YarnException e) { - provider.confirmPendingMutation(false); + provider.confirmPendingMutation(logMutation, false); throw e; } - provider.confirmPendingMutation(true); + provider.confirmPendingMutation(logMutation, true); return null; } }); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationStoreBaseTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationStoreBaseTest.java index 0f50b532b6..4b3153a0d5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationStoreBaseTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationStoreBaseTest.java @@ -64,7 +64,7 @@ public void testConfigurationUpdate() throws Exception { YarnConfigurationStore.LogMutation mutation1 = new YarnConfigurationStore.LogMutation(update1, TEST_USER); confStore.logMutation(mutation1); - confStore.confirmMutation(true); + confStore.confirmMutation(mutation1, true); assertEquals("valUpdate1", confStore.retrieve().get("keyUpdate1")); Map update2 = new HashMap<>(); @@ -72,7 +72,7 @@ public void testConfigurationUpdate() throws Exception { YarnConfigurationStore.LogMutation mutation2 = new YarnConfigurationStore.LogMutation(update2, TEST_USER); confStore.logMutation(mutation2); - confStore.confirmMutation(false); + confStore.confirmMutation(mutation2, false); assertNull("Configuration should not be updated", confStore.retrieve().get("keyUpdate2")); confStore.close(); @@ -89,7 +89,7 @@ public void testNullConfigurationUpdate() throws Exception { YarnConfigurationStore.LogMutation mutation = new YarnConfigurationStore.LogMutation(update, TEST_USER); confStore.logMutation(mutation); - confStore.confirmMutation(true); + confStore.confirmMutation(mutation, true); assertNull(confStore.retrieve().get("key")); confStore.close(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestFSSchedulerConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestFSSchedulerConfigurationStore.java index 796837291f..e4ca3d3ada 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestFSSchedulerConfigurationStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestFSSchedulerConfigurationStore.java @@ -100,7 +100,7 @@ public void confirmMutationWithValid() throws Exception { LogMutation logMutation = new LogMutation(updates, "test"); configurationStore.logMutation(logMutation); - configurationStore.confirmMutation(true); + configurationStore.confirmMutation(logMutation, true); storeConf = configurationStore.retrieve(); assertEquals(null, storeConf.get("a")); assertEquals("bb", storeConf.get("b")); @@ -110,7 +110,7 @@ public void confirmMutationWithValid() throws Exception { updates.put("b", "bbb"); configurationStore.logMutation(logMutation); - configurationStore.confirmMutation(true); + configurationStore.confirmMutation(logMutation, true); storeConf = configurationStore.retrieve(); assertEquals(null, storeConf.get("a")); assertEquals("bbb", storeConf.get("b")); @@ -133,7 +133,7 @@ public void confirmMutationWithInValid() throws Exception { LogMutation logMutation = new LogMutation(updates, "test"); configurationStore.logMutation(logMutation); - configurationStore.confirmMutation(false); + configurationStore.confirmMutation(logMutation, false); storeConf = configurationStore.retrieve(); compareConfig(conf, storeConf); @@ -168,7 +168,7 @@ public void testFileSystemClose() throws Exception { updates.put("testkey", "testvalue"); LogMutation logMutation = new LogMutation(updates, "test"); configStore.logMutation(logMutation); - configStore.confirmMutation(true); + configStore.confirmMutation(logMutation, true); } catch (IOException e) { if (e.getMessage().contains("Filesystem closed")) { fail("FSSchedulerConfigurationStore failed to handle " + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestLeveldbConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestLeveldbConfigurationStore.java index 5f78aa2807..0ae7624d78 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestLeveldbConfigurationStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestLeveldbConfigurationStore.java @@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfigurationProvider; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation; import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo; import org.junit.Before; import org.junit.Test; @@ -103,7 +104,7 @@ public void testPersistUpdatedConfiguration() throws Exception { YarnConfigurationStore.LogMutation mutation = new YarnConfigurationStore.LogMutation(update, TEST_USER); confStore.logMutation(mutation); - confStore.confirmMutation(true); + confStore.confirmMutation(mutation, true); assertEquals("val", confStore.retrieve().get("key")); confStore.close(); @@ -159,7 +160,7 @@ public void testMaxLogs() throws Exception { logs = ((LeveldbConfigurationStore) confStore).getLogs(); assertEquals(1, logs.size()); assertEquals("val1", logs.get(0).getUpdates().get("key1")); - confStore.confirmMutation(true); + confStore.confirmMutation(mutation, true); assertEquals(1, logs.size()); assertEquals("val1", logs.get(0).getUpdates().get("key1")); @@ -171,7 +172,7 @@ public void testMaxLogs() throws Exception { assertEquals(2, logs.size()); assertEquals("val1", logs.get(0).getUpdates().get("key1")); assertEquals("val2", logs.get(1).getUpdates().get("key2")); - confStore.confirmMutation(true); + confStore.confirmMutation(mutation, true); assertEquals(2, logs.size()); assertEquals("val1", logs.get(0).getUpdates().get("key1")); assertEquals("val2", logs.get(1).getUpdates().get("key2")); @@ -185,7 +186,7 @@ public void testMaxLogs() throws Exception { assertEquals(2, logs.size()); assertEquals("val2", logs.get(0).getUpdates().get("key2")); assertEquals("val3", logs.get(1).getUpdates().get("key3")); - confStore.confirmMutation(true); + confStore.confirmMutation(mutation, true); assertEquals(2, logs.size()); assertEquals("val2", logs.get(0).getUpdates().get("key2")); assertEquals("val3", logs.get(1).getUpdates().get("key3")); @@ -211,16 +212,17 @@ public void testRestartReadsFromUpdatedStore() throws Exception { rm1.getResourceScheduler()).getMutableConfProvider(); UserGroupInformation user = UserGroupInformation .createUserForTesting(TEST_USER, new String[0]); - confProvider.logAndApplyMutation(user, schedConfUpdateInfo); + LogMutation log = confProvider.logAndApplyMutation(user, + schedConfUpdateInfo); rm1.getResourceScheduler().reinitialize(conf, rm1.getRMContext()); assertEquals("val", ((MutableConfScheduler) rm1.getResourceScheduler()) .getConfiguration().get("key")); - confProvider.confirmPendingMutation(true); + confProvider.confirmPendingMutation(log, true); assertEquals("val", ((MutableCSConfigurationProvider) confProvider) .getConfStore().retrieve().get("key")); // Next update is not persisted, it should not be recovered schedConfUpdateInfo.getGlobalParams().put("key", "badVal"); - confProvider.logAndApplyMutation(user, schedConfUpdateInfo); + log = confProvider.logAndApplyMutation(user, schedConfUpdateInfo); rm1.close(); // Start RM2 and verifies it starts with updated configuration diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java index cb416e2515..0c9a312056 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java @@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation; import org.apache.hadoop.yarn.webapp.dao.QueueConfigInfo; import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo; import org.junit.Before; @@ -93,15 +94,15 @@ public void testInMemoryBackedProvider() throws Exception { assertNull(confProvider.loadConfiguration(conf) .get("yarn.scheduler.capacity.root.a.goodKey")); - confProvider.logAndApplyMutation(TEST_USER, goodUpdate); - confProvider.confirmPendingMutation(true); + LogMutation log = confProvider.logAndApplyMutation(TEST_USER, goodUpdate); + confProvider.confirmPendingMutation(log, true); assertEquals("goodVal", confProvider.loadConfiguration(conf) .get("yarn.scheduler.capacity.root.a.goodKey")); assertNull(confProvider.loadConfiguration(conf).get( "yarn.scheduler.capacity.root.a.badKey")); - confProvider.logAndApplyMutation(TEST_USER, badUpdate); - confProvider.confirmPendingMutation(false); + log = confProvider.logAndApplyMutation(TEST_USER, badUpdate); + confProvider.confirmPendingMutation(log, false); assertNull(confProvider.loadConfiguration(conf).get( "yarn.scheduler.capacity.root.a.badKey")); @@ -125,8 +126,8 @@ public void testRemoveQueueConfig() throws Exception { QueueConfigInfo("root.a", updateMap); updateInfo.getUpdateQueueInfo().add(queueConfigInfo); - confProvider.logAndApplyMutation(TEST_USER, updateInfo); - confProvider.confirmPendingMutation(true); + LogMutation log = confProvider.logAndApplyMutation(TEST_USER, updateInfo); + confProvider.confirmPendingMutation(log, true); assertEquals("testval1", confProvider.loadConfiguration(conf) .get("yarn.scheduler.capacity.root.a.testkey1")); assertEquals("testval2", confProvider.loadConfiguration(conf) @@ -138,8 +139,8 @@ public void testRemoveQueueConfig() throws Exception { queueConfigInfo = new QueueConfigInfo("root.a", updateMap); updateInfo.getUpdateQueueInfo().add(queueConfigInfo); - confProvider.logAndApplyMutation(TEST_USER, updateInfo); - confProvider.confirmPendingMutation(true); + log = confProvider.logAndApplyMutation(TEST_USER, updateInfo); + confProvider.confirmPendingMutation(log, true); assertNull("Failed to remove config", confProvider.loadConfiguration(conf) .get("yarn.scheduler.capacity.root.a.testkey1")); @@ -147,6 +148,38 @@ public void testRemoveQueueConfig() throws Exception { .get("yarn.scheduler.capacity.root.a.testkey2")); } + @Test + public void testMultipleUpdatesNotLost() throws Exception { + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.SCHEDULER_CONFIGURATION_STORE_CLASS, + YarnConfiguration.MEMORY_CONFIGURATION_STORE); + confProvider.init(conf); + + SchedConfUpdateInfo updateInfo1 = new SchedConfUpdateInfo(); + Map updateMap1 = new HashMap<>(); + updateMap1.put("key1", "val1"); + QueueConfigInfo queueConfigInfo1 = new + QueueConfigInfo("root.a", updateMap1); + updateInfo1.getUpdateQueueInfo().add(queueConfigInfo1); + LogMutation log1 = confProvider.logAndApplyMutation(TEST_USER, updateInfo1); + + SchedConfUpdateInfo updateInfo2 = new SchedConfUpdateInfo(); + Map updateMap2 = new HashMap<>(); + updateMap2.put("key2", "val2"); + QueueConfigInfo queueConfigInfo2 = new + QueueConfigInfo("root.a", updateMap2); + updateInfo2.getUpdateQueueInfo().add(queueConfigInfo2); + LogMutation log2 = confProvider.logAndApplyMutation(TEST_USER, updateInfo2); + + confProvider.confirmPendingMutation(log1, true); + confProvider.confirmPendingMutation(log2, true); + + assertEquals("val1", confProvider.loadConfiguration(conf) + .get("yarn.scheduler.capacity.root.a.key1")); + assertEquals("val2", confProvider.loadConfiguration(conf) + .get("yarn.scheduler.capacity.root.a.key2")); + } + @Test public void testHDFSBackedProvider() throws Exception { File testSchedulerConfigurationDir = new File( @@ -166,15 +199,15 @@ public void testHDFSBackedProvider() throws Exception { assertNull(confProvider.loadConfiguration(conf) .get("yarn.scheduler.capacity.root.a.goodKey")); - confProvider.logAndApplyMutation(TEST_USER, goodUpdate); - confProvider.confirmPendingMutation(true); + LogMutation log = confProvider.logAndApplyMutation(TEST_USER, goodUpdate); + confProvider.confirmPendingMutation(log, true); assertEquals("goodVal", confProvider.loadConfiguration(conf) .get("yarn.scheduler.capacity.root.a.goodKey")); assertNull(confProvider.loadConfiguration(conf).get( "yarn.scheduler.capacity.root.a.badKey")); - confProvider.logAndApplyMutation(TEST_USER, badUpdate); - confProvider.confirmPendingMutation(false); + log = confProvider.logAndApplyMutation(TEST_USER, badUpdate); + confProvider.confirmPendingMutation(log, false); assertNull(confProvider.loadConfiguration(conf).get( "yarn.scheduler.capacity.root.a.badKey")); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java index 57ccd75d58..e67e382f71 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java @@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfigurationProvider; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation; import org.apache.hadoop.yarn.webapp.dao.QueueConfigInfo; import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo; import org.junit.After; @@ -148,7 +149,7 @@ public void testGetConfigurationVersion() throws Exception { YarnConfigurationStore.LogMutation mutation = new YarnConfigurationStore.LogMutation(update, TEST_USER); confStore.logMutation(mutation); - confStore.confirmMutation(true); + confStore.confirmMutation(mutation, true); long v2 = confStore.getConfigVersion(); assertEquals(2, v2); } @@ -160,10 +161,9 @@ public void testPersistUpdatedConfiguration() throws Exception { Map update = new HashMap<>(); update.put("key", "val"); - YarnConfigurationStore.LogMutation mutation = - new YarnConfigurationStore.LogMutation(update, TEST_USER); + LogMutation mutation = new LogMutation(update, TEST_USER); confStore.logMutation(mutation); - confStore.confirmMutation(true); + confStore.confirmMutation(mutation, true); assertEquals("val", confStore.retrieve().get("key")); // Create a new configuration store, and check for updated configuration @@ -190,7 +190,7 @@ public void testMaxLogs() throws Exception { logs = ((ZKConfigurationStore) confStore).getLogs(); assertEquals(1, logs.size()); assertEquals("val1", logs.get(0).getUpdates().get("key1")); - confStore.confirmMutation(true); + confStore.confirmMutation(mutation, true); assertEquals(1, logs.size()); assertEquals("val1", logs.get(0).getUpdates().get("key1")); @@ -202,7 +202,7 @@ public void testMaxLogs() throws Exception { assertEquals(2, logs.size()); assertEquals("val1", logs.get(0).getUpdates().get("key1")); assertEquals("val2", logs.get(1).getUpdates().get("key2")); - confStore.confirmMutation(true); + confStore.confirmMutation(mutation, true); assertEquals(2, logs.size()); assertEquals("val1", logs.get(0).getUpdates().get("key1")); assertEquals("val2", logs.get(1).getUpdates().get("key2")); @@ -216,7 +216,7 @@ public void testMaxLogs() throws Exception { assertEquals(2, logs.size()); assertEquals("val2", logs.get(0).getUpdates().get("key2")); assertEquals("val3", logs.get(1).getUpdates().get("key3")); - confStore.confirmMutation(true); + confStore.confirmMutation(mutation, true); assertEquals(2, logs.size()); assertEquals("val2", logs.get(0).getUpdates().get("key2")); assertEquals("val3", logs.get(1).getUpdates().get("key3")); @@ -308,16 +308,17 @@ public void testFailoverReadsFromUpdatedStore() throws Exception { rm1.getResourceScheduler()).getMutableConfProvider(); UserGroupInformation user = UserGroupInformation .createUserForTesting(TEST_USER, new String[0]); - confProvider.logAndApplyMutation(user, schedConfUpdateInfo); + LogMutation log = confProvider.logAndApplyMutation(user, + schedConfUpdateInfo); rm1.getResourceScheduler().reinitialize(conf1, rm1.getRMContext()); assertEquals("val", ((MutableConfScheduler) rm1.getResourceScheduler()) .getConfiguration().get("key")); - confProvider.confirmPendingMutation(true); + confProvider.confirmPendingMutation(log, true); assertEquals("val", ((MutableCSConfigurationProvider) confProvider) .getConfStore().retrieve().get("key")); // Next update is not persisted, it should not be recovered schedConfUpdateInfo.getGlobalParams().put("key", "badVal"); - confProvider.logAndApplyMutation(user, schedConfUpdateInfo); + log = confProvider.logAndApplyMutation(user, schedConfUpdateInfo); // Start RM2 and verifies it starts with updated configuration rm2.getRMContext().getRMAdminService().transitionToActive(req); @@ -400,9 +401,10 @@ public void testFailoverAfterRemoveQueue() throws Exception { stopParams.put("capacity", "0"); QueueConfigInfo stopInfo = new QueueConfigInfo("root.default", stopParams); schedConfUpdateInfo.getUpdateQueueInfo().add(stopInfo); - confProvider.logAndApplyMutation(user, schedConfUpdateInfo); + LogMutation log = confProvider.logAndApplyMutation(user, + schedConfUpdateInfo); rm1.getResourceScheduler().reinitialize(conf1, rm1.getRMContext()); - confProvider.confirmPendingMutation(true); + confProvider.confirmPendingMutation(log, true); assertTrue(Arrays.asList(((MutableConfScheduler) rm1.getResourceScheduler()) .getConfiguration().get("yarn.scheduler.capacity.root.queues").split (",")).contains("a")); @@ -411,9 +413,9 @@ public void testFailoverAfterRemoveQueue() throws Exception { schedConfUpdateInfo.getUpdateQueueInfo().clear(); schedConfUpdateInfo.getAddQueueInfo().clear(); schedConfUpdateInfo.getRemoveQueueInfo().add("root.default"); - confProvider.logAndApplyMutation(user, schedConfUpdateInfo); + log = confProvider.logAndApplyMutation(user, schedConfUpdateInfo); rm1.getResourceScheduler().reinitialize(conf1, rm1.getRMContext()); - confProvider.confirmPendingMutation(true); + confProvider.confirmPendingMutation(log, true); assertEquals("a", ((MutableConfScheduler) rm1.getResourceScheduler()) .getConfiguration().get("yarn.scheduler.capacity.root.queues"));