diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java index bcdfb5924d..b43bcb839c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java @@ -344,11 +344,15 @@ protected DB getDB() { @Override public void storeVersion() throws Exception { - String key = VERSION_KEY; - byte[] data = ((VersionPBImpl) CURRENT_VERSION_INFO).getProto() + storeVersion(CURRENT_VERSION_INFO); + } + + @VisibleForTesting + protected void storeVersion(Version version) throws Exception { + byte[] data = ((VersionPBImpl) version).getProto() .toByteArray(); try { - db.put(bytes(key), data); + db.put(bytes(VERSION_KEY), data); } catch (DBException e) { throw new IOException(e); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfStoreVersionIncompatibleException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfStoreVersionIncompatibleException.java new file mode 100644 index 0000000000..2213ee24c6 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfStoreVersionIncompatibleException.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; + +import org.apache.hadoop.yarn.exceptions.YarnException; + +/** + * This exception is thrown by {@link YarnConfigurationStore} if it's loading + * an incompatible persisted schema version. + */ +public class YarnConfStoreVersionIncompatibleException extends + YarnException { + private static final long serialVersionUID = -2829858253579013629L; + + public YarnConfStoreVersionIncompatibleException(Throwable cause) { + super(cause); + } + + public YarnConfStoreVersionIncompatibleException(String message) { + super(message); + } + + public YarnConfStoreVersionIncompatibleException( + String message, Throwable cause) { + super(message, cause); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java index 34aa17428c..e8e3ecf87c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java @@ -23,7 +23,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateVersionIncompatibleException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import java.io.IOException; @@ -103,6 +102,7 @@ public void close() throws IOException {} /** * Logs the configuration change to backing store. + * * @param logMutation configuration change to be persisted in write ahead log * @throws IOException if logging fails */ @@ -169,23 +169,22 @@ public abstract void confirmMutation(LogMutation pendingMutation, protected abstract Version getCurrentVersion(); public void checkVersion() throws Exception { - // TODO this was taken from RMStateStore. Should probably refactor Version loadedVersion = getConfStoreVersion(); - LOG.info("Loaded configuration store version info " + loadedVersion); - if (loadedVersion != null && loadedVersion.equals(getCurrentVersion())) { + Version currentVersion = getCurrentVersion(); + LOG.info("Loaded configuration store version info {}", loadedVersion); + + // when hard-coded schema version (currentVersion) is null the version check + // is unnecessary + if (currentVersion == null || currentVersion.equals(loadedVersion)) { return; } // if there is no version info, treat it as CURRENT_VERSION_INFO; - if (loadedVersion == null) { - loadedVersion = getCurrentVersion(); - } - if (loadedVersion.isCompatibleTo(getCurrentVersion())) { - LOG.info("Storing configuration store version info " - + getCurrentVersion()); + if (loadedVersion == null || loadedVersion.isCompatibleTo(currentVersion)) { + LOG.info("Storing configuration store version info {}", currentVersion); storeVersion(); } else { - throw new RMStateVersionIncompatibleException( - "Expecting configuration store version " + getCurrentVersion() + throw new YarnConfStoreVersionIncompatibleException( + "Expecting configuration store version " + currentVersion + ", but loading version " + loadedVersion); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestInMemoryConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestInMemoryConfigurationStore.java index c40d16a27c..5ebad50fe6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestInMemoryConfigurationStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestInMemoryConfigurationStore.java @@ -18,6 +18,10 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; +import org.junit.Test; + +import static org.junit.Assert.fail; + /** * Tests {@link InMemoryConfigurationStore}. */ @@ -27,4 +31,13 @@ public class TestInMemoryConfigurationStore extends ConfigurationStoreBaseTest { protected YarnConfigurationStore createConfStore() { return new InMemoryConfigurationStore(); } + + @Test + public void checkVersion() { + try { + confStore.checkVersion(); + } catch (Exception e) { + fail("checkVersion threw exception"); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestLeveldbConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestLeveldbConfigurationStore.java index 0ae7624d78..785a910d78 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestLeveldbConfigurationStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestLeveldbConfigurationStore.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; +import org.apache.hadoop.yarn.server.records.Version; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.FileUtil; @@ -78,6 +79,23 @@ public void testVersioning() throws Exception { confStore.close(); } + @Test(expected = YarnConfStoreVersionIncompatibleException.class) + public void testIncompatibleVersion() throws Exception { + try { + confStore.initialize(conf, schedConf, rmContext); + + Version otherVersion = Version.newInstance(1, 1); + ((LeveldbConfigurationStore) confStore).storeVersion(otherVersion); + + assertEquals("The configuration store should have stored the new" + + "version.", otherVersion, + confStore.getConfStoreVersion()); + confStore.checkVersion(); + } finally { + confStore.close(); + } + } + @Test public void testPersistConfiguration() throws Exception { schedConf.set("key", "val"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java index e67e382f71..6715a0508f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java @@ -19,6 +19,10 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; import org.apache.hadoop.util.curator.ZKCuratorManager; +import org.apache.hadoop.yarn.server.records.Version; +import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.data.ACL; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.curator.framework.CuratorFramework; @@ -49,6 +53,7 @@ import java.util.Arrays; import java.util.HashMap; import java.util.LinkedList; +import java.util.List; import java.util.Map; import static org.junit.Assert.assertEquals; @@ -113,6 +118,29 @@ public void testVersioning() throws Exception { confStore.getConfStoreVersion()); } + @Test(expected = YarnConfStoreVersionIncompatibleException.class) + public void testIncompatibleVersion() throws Exception { + confStore.initialize(conf, schedConf, rmContext); + + Version otherVersion = Version.newInstance(1, 1); + String znodeParentPath = conf.get(YarnConfiguration. + RM_SCHEDCONF_STORE_ZK_PARENT_PATH, + YarnConfiguration.DEFAULT_RM_SCHEDCONF_STORE_ZK_PARENT_PATH); + String zkVersionPath = ZKCuratorManager.getNodePath(znodeParentPath, + "VERSION"); + String fencingNodePath = ZKCuratorManager.getNodePath(znodeParentPath, + "FENCING"); + byte[] versionData = + ((VersionPBImpl) otherVersion).getProto().toByteArray(); + List zkAcl = ZKCuratorManager.getZKAcls(conf); + ((ZKConfigurationStore) confStore).zkManager.safeCreate(zkVersionPath, + versionData, zkAcl, CreateMode.PERSISTENT, zkAcl, fencingNodePath); + + assertEquals("The configuration store should have stored the new" + + "version.", otherVersion, confStore.getConfStoreVersion()); + confStore.checkVersion(); + } + @Test public void testPersistConfiguration() throws Exception { schedConf.set("key", "val");