YARN.10003. YarnConfigurationStore#checkVersion throws exception that belongs to RMStateStore. Contributed by Benjamin Teke

This commit is contained in:
Szilard Nemeth 2020-03-06 17:49:45 +01:00
parent 004e955348
commit ea0444851d
6 changed files with 120 additions and 15 deletions

View File

@ -344,11 +344,15 @@ protected DB getDB() {
@Override @Override
public void storeVersion() throws Exception { public void storeVersion() throws Exception {
String key = VERSION_KEY; storeVersion(CURRENT_VERSION_INFO);
byte[] data = ((VersionPBImpl) CURRENT_VERSION_INFO).getProto() }
@VisibleForTesting
protected void storeVersion(Version version) throws Exception {
byte[] data = ((VersionPBImpl) version).getProto()
.toByteArray(); .toByteArray();
try { try {
db.put(bytes(key), data); db.put(bytes(VERSION_KEY), data);
} catch (DBException e) { } catch (DBException e) {
throw new IOException(e); throw new IOException(e);
} }

View File

@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
import org.apache.hadoop.yarn.exceptions.YarnException;
/**
* This exception is thrown by {@link YarnConfigurationStore} if it's loading
* an incompatible persisted schema version.
*/
public class YarnConfStoreVersionIncompatibleException extends
YarnException {
private static final long serialVersionUID = -2829858253579013629L;
public YarnConfStoreVersionIncompatibleException(Throwable cause) {
super(cause);
}
public YarnConfStoreVersionIncompatibleException(String message) {
super(message);
}
public YarnConfStoreVersionIncompatibleException(
String message, Throwable cause) {
super(message, cause);
}
}

View File

@ -23,7 +23,6 @@
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateVersionIncompatibleException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import java.io.IOException; import java.io.IOException;
@ -103,6 +102,7 @@ public void close() throws IOException {}
/** /**
* Logs the configuration change to backing store. * Logs the configuration change to backing store.
*
* @param logMutation configuration change to be persisted in write ahead log * @param logMutation configuration change to be persisted in write ahead log
* @throws IOException if logging fails * @throws IOException if logging fails
*/ */
@ -169,23 +169,22 @@ public abstract void confirmMutation(LogMutation pendingMutation,
protected abstract Version getCurrentVersion(); protected abstract Version getCurrentVersion();
public void checkVersion() throws Exception { public void checkVersion() throws Exception {
// TODO this was taken from RMStateStore. Should probably refactor
Version loadedVersion = getConfStoreVersion(); Version loadedVersion = getConfStoreVersion();
LOG.info("Loaded configuration store version info " + loadedVersion); Version currentVersion = getCurrentVersion();
if (loadedVersion != null && loadedVersion.equals(getCurrentVersion())) { LOG.info("Loaded configuration store version info {}", loadedVersion);
// when hard-coded schema version (currentVersion) is null the version check
// is unnecessary
if (currentVersion == null || currentVersion.equals(loadedVersion)) {
return; return;
} }
// if there is no version info, treat it as CURRENT_VERSION_INFO; // if there is no version info, treat it as CURRENT_VERSION_INFO;
if (loadedVersion == null) { if (loadedVersion == null || loadedVersion.isCompatibleTo(currentVersion)) {
loadedVersion = getCurrentVersion(); LOG.info("Storing configuration store version info {}", currentVersion);
}
if (loadedVersion.isCompatibleTo(getCurrentVersion())) {
LOG.info("Storing configuration store version info "
+ getCurrentVersion());
storeVersion(); storeVersion();
} else { } else {
throw new RMStateVersionIncompatibleException( throw new YarnConfStoreVersionIncompatibleException(
"Expecting configuration store version " + getCurrentVersion() "Expecting configuration store version " + currentVersion
+ ", but loading version " + loadedVersion); + ", but loading version " + loadedVersion);
} }
} }

View File

@ -18,6 +18,10 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
import org.junit.Test;
import static org.junit.Assert.fail;
/** /**
* Tests {@link InMemoryConfigurationStore}. * Tests {@link InMemoryConfigurationStore}.
*/ */
@ -27,4 +31,13 @@ public class TestInMemoryConfigurationStore extends ConfigurationStoreBaseTest {
protected YarnConfigurationStore createConfStore() { protected YarnConfigurationStore createConfStore() {
return new InMemoryConfigurationStore(); return new InMemoryConfigurationStore();
} }
@Test
public void checkVersion() {
try {
confStore.checkVersion();
} catch (Exception e) {
fail("checkVersion threw exception");
}
}
} }

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
import org.apache.hadoop.yarn.server.records.Version;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
@ -78,6 +79,23 @@ public void testVersioning() throws Exception {
confStore.close(); confStore.close();
} }
@Test(expected = YarnConfStoreVersionIncompatibleException.class)
public void testIncompatibleVersion() throws Exception {
try {
confStore.initialize(conf, schedConf, rmContext);
Version otherVersion = Version.newInstance(1, 1);
((LeveldbConfigurationStore) confStore).storeVersion(otherVersion);
assertEquals("The configuration store should have stored the new" +
"version.", otherVersion,
confStore.getConfStoreVersion());
confStore.checkVersion();
} finally {
confStore.close();
}
}
@Test @Test
public void testPersistConfiguration() throws Exception { public void testPersistConfiguration() throws Exception {
schedConf.set("key", "val"); schedConf.set("key", "val");

View File

@ -19,6 +19,10 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
import org.apache.hadoop.util.curator.ZKCuratorManager; import org.apache.hadoop.util.curator.ZKCuratorManager;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.ACL;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFramework;
@ -49,6 +53,7 @@
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List;
import java.util.Map; import java.util.Map;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
@ -113,6 +118,29 @@ public void testVersioning() throws Exception {
confStore.getConfStoreVersion()); confStore.getConfStoreVersion());
} }
@Test(expected = YarnConfStoreVersionIncompatibleException.class)
public void testIncompatibleVersion() throws Exception {
confStore.initialize(conf, schedConf, rmContext);
Version otherVersion = Version.newInstance(1, 1);
String znodeParentPath = conf.get(YarnConfiguration.
RM_SCHEDCONF_STORE_ZK_PARENT_PATH,
YarnConfiguration.DEFAULT_RM_SCHEDCONF_STORE_ZK_PARENT_PATH);
String zkVersionPath = ZKCuratorManager.getNodePath(znodeParentPath,
"VERSION");
String fencingNodePath = ZKCuratorManager.getNodePath(znodeParentPath,
"FENCING");
byte[] versionData =
((VersionPBImpl) otherVersion).getProto().toByteArray();
List<ACL> zkAcl = ZKCuratorManager.getZKAcls(conf);
((ZKConfigurationStore) confStore).zkManager.safeCreate(zkVersionPath,
versionData, zkAcl, CreateMode.PERSISTENT, zkAcl, fencingNodePath);
assertEquals("The configuration store should have stored the new" +
"version.", otherVersion, confStore.getConfStoreVersion());
confStore.checkVersion();
}
@Test @Test
public void testPersistConfiguration() throws Exception { public void testPersistConfiguration() throws Exception {
schedConf.set("key", "val"); schedConf.set("key", "val");