From e3579a8c3b1dc58a38859b189973be5a2d23f730 Mon Sep 17 00:00:00 2001 From: Xuan Date: Fri, 24 Feb 2017 15:58:12 -0800 Subject: [PATCH] YARN-5946: Create YarnConfigurationStore interface and InMemoryConfigurationStore class. Contributed by Jonathan Hung --- .../conf/InMemoryConfigurationStore.java | 86 ++++++++++ .../capacity/conf/YarnConfigurationStore.java | 154 ++++++++++++++++++ .../conf/TestYarnConfigurationStore.java | 70 ++++++++ 3 files changed, 310 insertions(+) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestYarnConfigurationStore.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java new file mode 100644 index 0000000000..a208fb9a96 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; + +import org.apache.hadoop.conf.Configuration; + +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +/** + * A default implementation of {@link YarnConfigurationStore}. Doesn't offer + * persistent configuration storage, just stores the configuration in memory. + */ +public class InMemoryConfigurationStore implements YarnConfigurationStore { + + private Configuration schedConf; + private LinkedList pendingMutations; + private long pendingId; + + @Override + public void initialize(Configuration conf, Configuration schedConf) { + this.schedConf = schedConf; + this.pendingMutations = new LinkedList<>(); + this.pendingId = 0; + } + + @Override + public synchronized long logMutation(LogMutation logMutation) { + logMutation.setId(++pendingId); + pendingMutations.add(logMutation); + return pendingId; + } + + @Override + public synchronized boolean confirmMutation(long id, boolean isValid) { + LogMutation mutation = pendingMutations.poll(); + // If confirmMutation is called out of order, discard mutations until id + // is reached. + while (mutation != null) { + if (mutation.getId() == id) { + if (isValid) { + Map mutations = mutation.getUpdates(); + for (Map.Entry kv : mutations.entrySet()) { + schedConf.set(kv.getKey(), kv.getValue()); + } + } + return true; + } + mutation = pendingMutations.poll(); + } + return false; + } + + @Override + public synchronized Configuration retrieve() { + return schedConf; + } + + @Override + public synchronized List getPendingMutations() { + return pendingMutations; + } + + @Override + public List getConfirmedConfHistory(long fromId) { + // Unimplemented. + return null; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java new file mode 100644 index 0000000000..22c0ef8f5e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java @@ -0,0 +1,154 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; + +import java.util.List; +import java.util.Map; + +/** + * YarnConfigurationStore exposes the methods needed for retrieving and + * persisting {@link CapacityScheduler} configuration via key-value + * using write-ahead logging. When configuration mutation is requested, caller + * should first log it with {@code logMutation}, which persists this pending + * mutation. This mutation is merged to the persisted configuration only after + * {@code confirmMutation} is called. + * + * On startup/recovery, caller should call {@code retrieve} to get all + * confirmed mutations, then get pending mutations which were not confirmed via + * {@code getPendingMutations}, and replay/confirm them via + * {@code confirmMutation} as in the normal case. + */ +public interface YarnConfigurationStore { + + /** + * LogMutation encapsulates the fields needed for configuration mutation + * audit logging and recovery. + */ + class LogMutation { + private Map updates; + private String user; + private long id; + + /** + * Create log mutation prior to logging. + * @param updates key-value configuration updates + * @param user user who requested configuration change + */ + public LogMutation(Map updates, String user) { + this(updates, user, 0); + } + + /** + * Create log mutation for recovery. + * @param updates key-value configuration updates + * @param user user who requested configuration change + * @param id transaction id of configuration change + */ + LogMutation(Map updates, String user, long id) { + this.updates = updates; + this.user = user; + this.id = id; + } + + /** + * Get key-value configuration updates. + * @return map of configuration updates + */ + public Map getUpdates() { + return updates; + } + + /** + * Get user who requested configuration change. + * @return user who requested configuration change + */ + public String getUser() { + return user; + } + + /** + * Get transaction id of this configuration change. + * @return transaction id + */ + public long getId() { + return id; + } + + /** + * Set transaction id of this configuration change. + * @param id transaction id + */ + public void setId(long id) { + this.id = id; + } + } + + /** + * Initialize the configuration store. + * @param conf configuration to initialize store with + * @param schedConf Initial key-value configuration to persist + */ + void initialize(Configuration conf, Configuration schedConf); + + /** + * Logs the configuration change to backing store. Generates an id associated + * with this mutation, sets it in {@code logMutation}, and returns it. + * @param logMutation configuration change to be persisted in write ahead log + * @return id which configuration store associates with this mutation + */ + long logMutation(LogMutation logMutation); + + /** + * Should be called after {@code logMutation}. Gets the pending mutation + * associated with {@code id} and marks the mutation as persisted (no longer + * pending). If isValid is true, merge the mutation with the persisted + * configuration. + * + * If {@code confirmMutation} is called with ids in a different order than + * was returned by {@code logMutation}, the result is implementation + * dependent. + * @param id id of mutation to be confirmed + * @param isValid if true, update persisted configuration with mutation + * associated with {@code id}. + * @return true on success + */ + boolean confirmMutation(long id, boolean isValid); + + /** + * Retrieve the persisted configuration. + * @return configuration as key-value + */ + Configuration retrieve(); + + /** + * Get the list of pending mutations, in the order they were logged. + * @return list of mutations + */ + List getPendingMutations(); + + /** + * Get a list of confirmed configuration mutations starting from a given id. + * @param fromId id from which to start getting mutations, inclusive + * @return list of configuration mutations + */ + List getConfirmedConfHistory(long fromId); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestYarnConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestYarnConfigurationStore.java new file mode 100644 index 0000000000..dff4e77470 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestYarnConfigurationStore.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation; +import org.junit.Before; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +public class TestYarnConfigurationStore { + + private YarnConfigurationStore confStore; + private Configuration schedConf; + + private static final String testUser = "testUser"; + + @Before + public void setUp() { + schedConf = new Configuration(false); + schedConf.set("key1", "val1"); + } + + @Test + public void testInMemoryConfigurationStore() { + confStore = new InMemoryConfigurationStore(); + confStore.initialize(new Configuration(), schedConf); + assertEquals("val1", confStore.retrieve().get("key1")); + + Map update1 = new HashMap<>(); + update1.put("keyUpdate1", "valUpdate1"); + LogMutation mutation1 = new LogMutation(update1, testUser); + long id = confStore.logMutation(mutation1); + assertEquals(1, confStore.getPendingMutations().size()); + confStore.confirmMutation(id, true); + assertEquals("valUpdate1", confStore.retrieve().get("keyUpdate1")); + assertEquals(0, confStore.getPendingMutations().size()); + + Map update2 = new HashMap<>(); + update2.put("keyUpdate2", "valUpdate2"); + LogMutation mutation2 = new LogMutation(update2, testUser); + id = confStore.logMutation(mutation2); + assertEquals(1, confStore.getPendingMutations().size()); + confStore.confirmMutation(id, false); + assertNull("Configuration should not be updated", + confStore.retrieve().get("keyUpdate2")); + assertEquals(0, confStore.getPendingMutations().size()); + } +}