YARN-9997. Code cleanup in ZKConfigurationStore. Contributed by Andras Gyori

This commit is contained in:
Szilard Nemeth 2020-03-12 12:29:03 +01:00
parent 38d87883b6
commit 5ead9c15ca
2 changed files with 151 additions and 54 deletions

View File

@ -55,8 +55,6 @@ public class ZKConfigurationStore extends YarnConfigurationStore {
.newInstance(0, 1); .newInstance(0, 1);
private Configuration conf; private Configuration conf;
private String znodeParentPath;
private static final String ZK_VERSION_PATH = "VERSION"; private static final String ZK_VERSION_PATH = "VERSION";
private static final String LOGS_PATH = "LOGS"; private static final String LOGS_PATH = "LOGS";
private static final String CONF_STORE_PATH = "CONF_STORE"; private static final String CONF_STORE_PATH = "CONF_STORE";
@ -69,19 +67,20 @@ public class ZKConfigurationStore extends YarnConfigurationStore {
private String fencingNodePath; private String fencingNodePath;
private String confVersionPath; private String confVersionPath;
@VisibleForTesting private ZKCuratorManager zkManager;
protected ZKCuratorManager zkManager;
private List<ACL> zkAcl; private List<ACL> zkAcl;
@Override @Override
public void initialize(Configuration config, Configuration schedConf, public void initialize(Configuration config, Configuration schedConf,
RMContext rmContext) throws Exception { RMContext rmContext) throws Exception {
this.conf = config; this.conf = config;
String znodeParentPath = conf.get(
YarnConfiguration.RM_SCHEDCONF_STORE_ZK_PARENT_PATH,
YarnConfiguration.DEFAULT_RM_SCHEDCONF_STORE_ZK_PARENT_PATH);
this.maxLogs = conf.getLong(YarnConfiguration.RM_SCHEDCONF_MAX_LOGS, this.maxLogs = conf.getLong(YarnConfiguration.RM_SCHEDCONF_MAX_LOGS,
YarnConfiguration.DEFAULT_RM_SCHEDCONF_ZK_MAX_LOGS); YarnConfiguration.DEFAULT_RM_SCHEDCONF_ZK_MAX_LOGS);
this.znodeParentPath =
conf.get(YarnConfiguration.RM_SCHEDCONF_STORE_ZK_PARENT_PATH,
YarnConfiguration.DEFAULT_RM_SCHEDCONF_STORE_ZK_PARENT_PATH);
this.zkManager = this.zkManager =
rmContext.getResourceManager().createAndStartZKManager(conf); rmContext.getResourceManager().createAndStartZKManager(conf);
this.zkAcl = ZKCuratorManager.getZKAcls(conf); this.zkAcl = ZKCuratorManager.getZKAcls(conf);
@ -95,37 +94,31 @@ public void initialize(Configuration config, Configuration schedConf,
zkManager.createRootDirRecursively(znodeParentPath, zkAcl); zkManager.createRootDirRecursively(znodeParentPath, zkAcl);
zkManager.delete(fencingNodePath); zkManager.delete(fencingNodePath);
if (!zkManager.exists(logsPath)) { if (createNewZkPath(logsPath)) {
zkManager.create(logsPath); setZkData(logsPath, new LinkedList<LogMutation>());
zkManager.setData(logsPath,
serializeObject(new LinkedList<LogMutation>()), -1);
} }
if (!zkManager.exists(confVersionPath)) { if (createNewZkPath(confVersionPath)) {
zkManager.create(confVersionPath); setZkData(confVersionPath, String.valueOf(0));
zkManager.setData(confVersionPath, String.valueOf(0), -1);
} }
if (!zkManager.exists(confStorePath)) { if (createNewZkPath(confStorePath)) {
zkManager.create(confStorePath);
HashMap<String, String> mapSchedConf = new HashMap<>(); HashMap<String, String> mapSchedConf = new HashMap<>();
for (Map.Entry<String, String> entry : schedConf) { for (Map.Entry<String, String> entry : schedConf) {
mapSchedConf.put(entry.getKey(), entry.getValue()); mapSchedConf.put(entry.getKey(), entry.getValue());
} }
zkManager.setData(confStorePath, serializeObject(mapSchedConf), -1); setZkData(confStorePath, mapSchedConf);
long configVersion = getConfigVersion() + 1L; long configVersion = getConfigVersion() + 1L;
zkManager.setData(confVersionPath, String.valueOf(configVersion), -1); setZkData(confVersionPath, String.valueOf(configVersion));
} }
} }
@VisibleForTesting @VisibleForTesting
@Override @Override
protected LinkedList<LogMutation> getLogs() throws Exception { protected LinkedList<LogMutation> getLogs() throws Exception {
return (LinkedList<LogMutation>) return unsafeCast(deserializeObject(getZkData(logsPath)));
deserializeObject(zkManager.getData(logsPath));
} }
// TODO: following version-related code is taken from ZKRMStateStore
@Override @Override
public Version getCurrentVersion() { public Version getCurrentVersion() {
return CURRENT_VERSION_INFO; return CURRENT_VERSION_INFO;
@ -134,7 +127,7 @@ public Version getCurrentVersion() {
@Override @Override
public Version getConfStoreVersion() throws Exception { public Version getConfStoreVersion() throws Exception {
if (zkManager.exists(zkVersionPath)) { if (zkManager.exists(zkVersionPath)) {
byte[] data = zkManager.getData(zkVersionPath); byte[] data = getZkData(zkVersionPath);
return new VersionPBImpl(YarnServerCommonProtos.VersionProto return new VersionPBImpl(YarnServerCommonProtos.VersionProto
.parseFrom(data)); .parseFrom(data));
} }
@ -153,27 +146,25 @@ public synchronized void storeVersion() throws Exception {
((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray(); ((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray();
if (zkManager.exists(zkVersionPath)) { if (zkManager.exists(zkVersionPath)) {
zkManager.safeSetData(zkVersionPath, data, -1, zkAcl, fencingNodePath); safeSetZkData(zkVersionPath, data);
} else { } else {
zkManager.safeCreate(zkVersionPath, data, zkAcl, CreateMode.PERSISTENT, safeCreateZkData(zkVersionPath, data);
zkAcl, fencingNodePath);
} }
} }
@Override @Override
public void logMutation(LogMutation logMutation) throws Exception { public void logMutation(LogMutation logMutation) throws Exception {
if (maxLogs > 0) { if (maxLogs > 0) {
byte[] storedLogs = zkManager.getData(logsPath); byte[] storedLogs = getZkData(logsPath);
LinkedList<LogMutation> logs = new LinkedList<>(); LinkedList<LogMutation> logs = new LinkedList<>();
if (storedLogs != null) { if (storedLogs != null) {
logs = (LinkedList<LogMutation>) deserializeObject(storedLogs); logs = unsafeCast(deserializeObject(storedLogs));
} }
logs.add(logMutation); logs.add(logMutation);
if (logs.size() > maxLogs) { if (logs.size() > maxLogs) {
logs.remove(logs.removeFirst()); logs.remove(logs.removeFirst());
} }
zkManager.safeSetData(logsPath, serializeObject(logs), -1, zkAcl, safeSetZkData(logsPath, logs);
fencingNodePath);
} }
} }
@ -194,10 +185,9 @@ public void confirmMutation(LogMutation pendingMutation,
mapConf.put(confChange.getKey(), confChange.getValue()); mapConf.put(confChange.getKey(), confChange.getValue());
} }
} }
zkManager.safeSetData(confStorePath, serializeObject(mapConf), -1, safeSetZkData(confStorePath, mapConf);
zkAcl, fencingNodePath);
long configVersion = getConfigVersion() + 1L; long configVersion = getConfigVersion() + 1L;
zkManager.setData(confVersionPath, String.valueOf(configVersion), -1); setZkData(confVersionPath, String.valueOf(configVersion));
} }
} }
@ -206,14 +196,14 @@ public void confirmMutation(LogMutation pendingMutation,
public synchronized Configuration retrieve() { public synchronized Configuration retrieve() {
byte[] serializedSchedConf; byte[] serializedSchedConf;
try { try {
serializedSchedConf = zkManager.getData(confStorePath); serializedSchedConf = getZkData(confStorePath);
} catch (Exception e) { } catch (Exception e) {
LOG.error("Failed to retrieve configuration from zookeeper store", e); LOG.error("Failed to retrieve configuration from zookeeper store", e);
return null; return null;
} }
try { try {
Map<String, String> map = Map<String, String> map =
(HashMap<String, String>) deserializeObject(serializedSchedConf); unsafeCast(deserializeObject(serializedSchedConf));
Configuration c = new Configuration(false); Configuration c = new Configuration(false);
for (Map.Entry<String, String> e : map.entrySet()) { for (Map.Entry<String, String> e : map.entrySet()) {
c.set(e.getKey(), e.getValue()); c.set(e.getKey(), e.getValue());
@ -228,7 +218,14 @@ public synchronized Configuration retrieve() {
@Override @Override
public long getConfigVersion() throws Exception { public long getConfigVersion() throws Exception {
return Long.parseLong(zkManager.getStringData(confVersionPath)); String version = zkManager.getStringData(confVersionPath);
if (version == null) {
throw new IllegalStateException("Config version can not be properly " +
"serialized. Check Zookeeper config version path to locate " +
"the error!");
}
return Long.parseLong(version);
} }
@Override @Override
@ -236,6 +233,55 @@ public List<LogMutation> getConfirmedConfHistory(long fromId) {
return null; // unimplemented return null; // unimplemented
} }
/**
* Creates a new path in Zookeeper only, if it does not already exist.
*
* @param path Value of the Zookeeper path
* @return <code>true</code>if the creation executed; <code>false</code>
* otherwise.
* @throws Exception
*/
private boolean createNewZkPath(String path) throws Exception {
if (!zkManager.exists(path)) {
zkManager.create(path);
return true;
} else {
return false;
}
}
@VisibleForTesting
protected byte[] getZkData(String path) throws Exception {
return zkManager.getData(path);
}
@VisibleForTesting
protected void setZkData(String path, byte[] data) throws Exception {
zkManager.setData(path, data, -1);
}
private void setZkData(String path, Object data) throws Exception {
setZkData(path, serializeObject(data));
}
private void setZkData(String path, String data) throws Exception {
zkManager.setData(path, data, -1);
}
private void safeSetZkData(String path, byte[] data) throws Exception {
zkManager.safeSetData(path, data, -1, zkAcl, fencingNodePath);
}
private void safeSetZkData(String path, Object data) throws Exception {
safeSetZkData(path, serializeObject(data));
}
@VisibleForTesting
protected void safeCreateZkData(String path, byte[] data) throws Exception {
zkManager.safeCreate(path, data, zkAcl, CreateMode.PERSISTENT,
zkAcl, fencingNodePath);
}
private static String getNodePath(String root, String nodeName) { private static String getNodePath(String root, String nodeName) {
return ZKCuratorManager.getNodePath(root, nodeName); return ZKCuratorManager.getNodePath(root, nodeName);
} }
@ -256,4 +302,18 @@ private static Object deserializeObject(byte[] bytes) throws Exception {
return ois.readObject(); return ois.readObject();
} }
} }
/**
* Casts an object of type Object to type T. It is essential to emphasize,
* that it is an unsafe operation.
*
* @param o Object to be cast from
* @param <T> Type to cast to
* @return casted object of type T
* @throws ClassCastException
*/
@SuppressWarnings("unchecked")
private static <T> T unsafeCast(Object o) throws ClassCastException {
return (T)o;
}
} }

View File

@ -41,8 +41,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation;
import org.apache.hadoop.yarn.webapp.dao.QueueConfigInfo; import org.apache.hadoop.yarn.webapp.dao.QueueConfigInfo;
import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo; import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.ACL;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -50,9 +48,10 @@
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
@ -114,18 +113,11 @@ public void testIncompatibleVersion() throws Exception {
confStore.initialize(conf, schedConf, rmContext); confStore.initialize(conf, schedConf, rmContext);
Version otherVersion = Version.newInstance(1, 1); Version otherVersion = Version.newInstance(1, 1);
String znodeParentPath = conf.get(YarnConfiguration. String zkVersionPath = getZkPath("VERSION");
RM_SCHEDCONF_STORE_ZK_PARENT_PATH,
YarnConfiguration.DEFAULT_RM_SCHEDCONF_STORE_ZK_PARENT_PATH);
String zkVersionPath = ZKCuratorManager.getNodePath(znodeParentPath,
"VERSION");
String fencingNodePath = ZKCuratorManager.getNodePath(znodeParentPath,
"FENCING");
byte[] versionData = byte[] versionData =
((VersionPBImpl) otherVersion).getProto().toByteArray(); ((VersionPBImpl) otherVersion).getProto().toByteArray();
List<ACL> zkAcl = ZKCuratorManager.getZKAcls(conf); ((ZKConfigurationStore) confStore).safeCreateZkData(zkVersionPath,
((ZKConfigurationStore) confStore).zkManager.safeCreate(zkVersionPath, versionData);
versionData, zkAcl, CreateMode.PERSISTENT, zkAcl, fencingNodePath);
assertEquals("The configuration store should have stored the new" + assertEquals("The configuration store should have stored the new" +
"version.", otherVersion, confStore.getConfStoreVersion()); "version.", otherVersion, confStore.getConfStoreVersion());
@ -141,20 +133,58 @@ public void testFormatConfiguration() throws Exception {
assertNull(confStore.retrieve()); assertNull(confStore.retrieve());
} }
@Test(expected = IllegalStateException.class)
public void testGetConfigurationVersionOnSerializedNullData()
throws Exception {
confStore.initialize(conf, schedConf, rmContext);
String confVersionPath = getZkPath("CONF_VERSION");
((ZKConfigurationStore) confStore).setZkData(confVersionPath, null);
confStore.getConfigVersion();
}
/**
* The correct behavior of logMutation should be, that even though an
* Exception is thrown during serialization, the log data must not be
* overridden.
*
* @throws Exception
*/
@Test(expected = ClassCastException.class)
public void testLogMutationAfterSerializationError() throws Exception {
byte[] data = null;
String logs = "NOT_LINKED_LIST";
confStore.initialize(conf, schedConf, rmContext);
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos)) {
oos.writeObject(logs);
oos.flush();
baos.flush();
data = baos.toByteArray();
}
String logsPath = getZkPath("LOGS");
((ZKConfigurationStore)confStore).setZkData(logsPath, data);
Map<String, String> update = new HashMap<>();
update.put("valid_key", "valid_value");
confStore.logMutation(new LogMutation(update, TEST_USER));
assertEquals(data, ((ZKConfigurationStore)confStore).getZkData(logsPath));
}
@Test @Test
public void testDisableAuditLogs() throws Exception { public void testDisableAuditLogs() throws Exception {
conf.setLong(YarnConfiguration.RM_SCHEDCONF_MAX_LOGS, 0); conf.setLong(YarnConfiguration.RM_SCHEDCONF_MAX_LOGS, 0);
confStore.initialize(conf, schedConf, rmContext); confStore.initialize(conf, schedConf, rmContext);
String znodeParentPath = conf.get(YarnConfiguration. String logsPath = getZkPath("LOGS");
RM_SCHEDCONF_STORE_ZK_PARENT_PATH,
YarnConfiguration.DEFAULT_RM_SCHEDCONF_STORE_ZK_PARENT_PATH);
String logsPath = ZKCuratorManager.getNodePath(znodeParentPath, "LOGS");
byte[] data = null; byte[] data = null;
((ZKConfigurationStore) confStore).zkManager.setData(logsPath, data, -1); ((ZKConfigurationStore) confStore).setZkData(logsPath, data);
prepareLogMutation("key1", "val1"); prepareLogMutation("key1", "val1");
data = ((ZKConfigurationStore) confStore).zkManager.getData(logsPath); data = ((ZKConfigurationStore) confStore).getZkData(logsPath);
assertNull("Failed to Disable Audit Logs", data); assertNull("Failed to Disable Audit Logs", data);
} }
@ -376,6 +406,13 @@ public YarnConfigurationStore createConfStore() {
return new ZKConfigurationStore(); return new ZKConfigurationStore();
} }
private String getZkPath(String nodeName) {
String znodeParentPath = conf.get(YarnConfiguration.
RM_SCHEDCONF_STORE_ZK_PARENT_PATH,
YarnConfiguration.DEFAULT_RM_SCHEDCONF_STORE_ZK_PARENT_PATH);
return ZKCuratorManager.getNodePath(znodeParentPath, nodeName);
}
@Override @Override
Version getVersion() { Version getVersion() {
return ZKConfigurationStore.CURRENT_VERSION_INFO; return ZKConfigurationStore.CURRENT_VERSION_INFO;