From 5cc6f945da648a48441b00aca89f3cc1a9777143 Mon Sep 17 00:00:00 2001 From: Szilard Nemeth Date: Thu, 5 Dec 2019 20:43:26 +0100 Subject: [PATCH] YARN-9789. Disable Option for Write Ahead Logs of LogMutation. Contributed by Prabhu Joseph --- .../conf/LeveldbConfigurationStore.java | 17 +++++++--- .../capacity/conf/ZKConfigurationStore.java | 22 +++++++------ .../conf/TestLeveldbConfigurationStore.java | 31 +++++++++++++++++++ .../conf/TestZKConfigurationStore.java | 23 ++++++++++++++ 4 files changed, 78 insertions(+), 15 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java index 2966c948d2..39cd8ff9f2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java @@ -224,12 +224,14 @@ public void close() throws IOException { @Override public void logMutation(LogMutation logMutation) throws IOException { - LinkedList logs = deserLogMutations(db.get(bytes(LOG_KEY))); - logs.add(logMutation); - if (logs.size() > maxLogs) { - logs.removeFirst(); + if (maxLogs > 0) { + LinkedList logs = deserLogMutations(db.get(bytes(LOG_KEY))); + logs.add(logMutation); + if (logs.size() > maxLogs) { + logs.removeFirst(); + } + db.put(bytes(LOG_KEY), serLogMutations(logs)); } - db.put(bytes(LOG_KEY), serLogMutations(logs)); pendingMutation = logMutation; } @@ -337,6 +339,11 @@ protected LinkedList getLogs() throws Exception { return deserLogMutations(db.get(bytes(LOG_KEY))); } + @VisibleForTesting + protected DB getDB() { + return db; + } + @Override public void storeVersion() throws Exception { String key = VERSION_KEY; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ZKConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ZKConfigurationStore.java index 1aee4159a3..75ae727dff 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ZKConfigurationStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ZKConfigurationStore.java @@ -162,17 +162,19 @@ public synchronized void storeVersion() throws Exception { @Override public void logMutation(LogMutation logMutation) throws Exception { - byte[] storedLogs = zkManager.getData(logsPath); - LinkedList logs = new LinkedList<>(); - if (storedLogs != null) { - logs = (LinkedList) deserializeObject(storedLogs); + if (maxLogs > 0) { + byte[] storedLogs = zkManager.getData(logsPath); + LinkedList logs = new LinkedList<>(); + if (storedLogs != null) { + logs = (LinkedList) deserializeObject(storedLogs); + } + logs.add(logMutation); + if (logs.size() > maxLogs) { + logs.remove(logs.removeFirst()); + } + zkManager.safeSetData(logsPath, serializeObject(logs), -1, zkAcl, + fencingNodePath); } - logs.add(logMutation); - if (logs.size() > maxLogs) { - logs.remove(logs.removeFirst()); - } - zkManager.safeSetData(logsPath, serializeObject(logs), -1, zkAcl, - fencingNodePath); pendingMutation = logMutation; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestLeveldbConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestLeveldbConfigurationStore.java index 3381637fb8..5f78aa2807 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestLeveldbConfigurationStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestLeveldbConfigurationStore.java @@ -31,13 +31,17 @@ import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo; import org.junit.Before; import org.junit.Test; +import org.iq80.leveldb.DB; +import org.iq80.leveldb.DBIterator; import java.io.File; +import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.LinkedList; import java.util.Map; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; /** @@ -112,6 +116,33 @@ public void testPersistUpdatedConfiguration() throws Exception { confStore.close(); } + @Test + public void testDisableAuditLogs() throws Exception { + conf.setLong(YarnConfiguration.RM_SCHEDCONF_MAX_LOGS, 0); + confStore.initialize(conf, schedConf, rmContext); + + Map update = new HashMap<>(); + update.put("key1", "val1"); + YarnConfigurationStore.LogMutation mutation = + new YarnConfigurationStore.LogMutation(update, TEST_USER); + confStore.logMutation(mutation); + + boolean logKeyPresent = false; + DB db = ((LeveldbConfigurationStore) confStore).getDB(); + DBIterator itr = db.iterator(); + itr.seekToFirst(); + while (itr.hasNext()) { + Map.Entry entry = itr.next(); + String key = new String(entry.getKey(), StandardCharsets.UTF_8); + if (key.equals("log")) { + logKeyPresent = true; + break; + } + } + assertFalse("Audit Log is not disabled", logKeyPresent); + confStore.close(); + } + @Test public void testMaxLogs() throws Exception { conf.setLong(YarnConfiguration.RM_SCHEDCONF_MAX_LOGS, 2); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java index eae80d500e..57ccd75d58 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; +import org.apache.hadoop.util.curator.ZKCuratorManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.curator.framework.CuratorFramework; @@ -221,6 +222,28 @@ public void testMaxLogs() throws Exception { assertEquals("val3", logs.get(1).getUpdates().get("key3")); } + + @Test + public void testDisableAuditLogs() throws Exception { + conf.setLong(YarnConfiguration.RM_SCHEDCONF_MAX_LOGS, 0); + confStore.initialize(conf, schedConf, rmContext); + String znodeParentPath = conf.get(YarnConfiguration. + RM_SCHEDCONF_STORE_ZK_PARENT_PATH, + YarnConfiguration.DEFAULT_RM_SCHEDCONF_STORE_ZK_PARENT_PATH); + String logsPath = ZKCuratorManager.getNodePath(znodeParentPath, "LOGS"); + byte[] data = null; + ((ZKConfigurationStore) confStore).zkManager.setData(logsPath, data, -1); + + Map update = new HashMap<>(); + update.put("key1", "val1"); + YarnConfigurationStore.LogMutation mutation = + new YarnConfigurationStore.LogMutation(update, TEST_USER); + confStore.logMutation(mutation); + + data = ((ZKConfigurationStore) confStore).zkManager.getData(logsPath); + assertNull("Failed to Disable Audit Logs", data); + } + public Configuration createRMHAConf(String rmIds, String rmId, int adminPort) { Configuration conf = new YarnConfiguration();