diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml
index 0c7cfc864d..2c0f03a5f1 100644
--- a/hadoop-project/pom.xml
+++ b/hadoop-project/pom.xml
@@ -828,7 +828,6 @@
zookeeper
${zookeeper.version}
test-jar
- test
org.jboss.netty
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 884b506d86..91ce11f789 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -301,6 +301,9 @@ Release 2.7.0 - UNRELEASED
YARN-1299. Improve a log message in AppSchedulingInfo by adding application
id. (Ashutosh Jindal and Devaraj K via ozawa)
+ YARN-1514. Utility to benchmark ZKRMStateStore#loadState for RM HA.
+ (Tsuyoshi OZAWA via jianhe)
+
OPTIMIZATIONS
YARN-2990. FairScheduler's delay-scheduling always waits for node-local and
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
index 9bcc7c879d..ff429ccd3f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
@@ -186,7 +186,6 @@
org.apache.zookeeper
zookeeper
test-jar
- test
@@ -245,6 +244,13 @@
test-jar
test-compile
+
+
+
+ org.apache.hadoop.test.YarnTestDriver
+
+
+
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/test/YarnTestDriver.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/test/YarnTestDriver.java
new file mode 100644
index 0000000000..8874ed822d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/test/YarnTestDriver.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.test;
+
+import org.apache.hadoop.util.ProgramDriver;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.TestZKRMStateStorePerf;
+
+/**
+ * Driver for Yarn tests.
+ *
+ */
+public class YarnTestDriver {
+
+ private ProgramDriver pgd;
+
+ public YarnTestDriver() {
+ this(new ProgramDriver());
+ }
+
+ public YarnTestDriver(ProgramDriver pgd) {
+ this.pgd = pgd;
+ try {
+ pgd.addClass(TestZKRMStateStorePerf.class.getSimpleName(),
+ TestZKRMStateStorePerf.class,
+ "ZKRMStateStore i/o benchmark.");
+ } catch(Throwable e) {
+ e.printStackTrace();
+ }
+ }
+
+ public void run(String argv[]) {
+ int exitCode = -1;
+ try {
+ exitCode = pgd.run(argv);
+ } catch(Throwable e) {
+ e.printStackTrace();
+ }
+ System.exit(exitCode);
+ }
+
+ public static void main(String argv[]){
+ new YarnTestDriver().run(argv);
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
index b01969bd08..5b53a0235a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
+import org.apache.hadoop.yarn.event.Event;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@@ -79,8 +80,7 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
public static final Log LOG = LogFactory.getLog(RMStateStoreTestBase.class);
- static class TestDispatcher implements
- Dispatcher, EventHandler {
+ static class TestDispatcher implements Dispatcher, EventHandler {
ApplicationAttemptId attemptId;
@@ -93,8 +93,11 @@ public void register(Class extends Enum> eventType,
}
@Override
- public void handle(RMAppAttemptEvent event) {
- assertEquals(attemptId, event.getApplicationAttemptId());
+ public void handle(Event event) {
+ if (event instanceof RMAppAttemptEvent) {
+ RMAppAttemptEvent rmAppAttemptEvent = (RMAppAttemptEvent) event;
+ assertEquals(attemptId, rmAppAttemptEvent.getApplicationAttemptId());
+ }
notified = true;
synchronized (this) {
notifyAll();
@@ -134,7 +137,8 @@ void waitNotify(TestDispatcher dispatcher) {
dispatcher.notified = false;
}
- RMApp storeApp(RMStateStore store, ApplicationId appId, long submitTime,
+ protected RMApp storeApp(RMStateStore store, ApplicationId appId,
+ long submitTime,
long startTime) throws Exception {
ApplicationSubmissionContext context =
new ApplicationSubmissionContextPBImpl();
@@ -150,7 +154,8 @@ RMApp storeApp(RMStateStore store, ApplicationId appId, long submitTime,
return mockApp;
}
- ContainerId storeAttempt(RMStateStore store, ApplicationAttemptId attemptId,
+ protected ContainerId storeAttempt(RMStateStore store,
+ ApplicationAttemptId attemptId,
String containerIdStr, Token appToken,
SecretKey clientTokenMasterKey, TestDispatcher dispatcher)
throws Exception {
@@ -474,7 +479,7 @@ public void testRMDTSecretManagerStateStore(
}
- private Token generateAMRMToken(
+ protected Token generateAMRMToken(
ApplicationAttemptId attemptId,
AMRMTokenSecretManager appTokenMgr) {
Token appToken =
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStorePerf.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStorePerf.java
new file mode 100644
index 0000000000..654b357298
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStorePerf.java
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.recovery;
+
+import com.google.common.base.Optional;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+import javax.crypto.SecretKey;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
+import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.junit.Before;
+import org.junit.After;
+import org.junit.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestZKRMStateStorePerf extends RMStateStoreTestBase
+ implements Tool {
+ public static final Log LOG = LogFactory.getLog(TestZKRMStateStore.class);
+
+ final String version = "0.1";
+
+ // Configurable variables for performance test
+ private int ZK_PERF_NUM_APP_DEFAULT = 1000;
+ private int ZK_PERF_NUM_APPATTEMPT_PER_APP = 10;
+
+ private final long clusterTimeStamp = 1352994193343L;
+
+ private static final String USAGE =
+ "Usage: " + TestZKRMStateStorePerf.class.getSimpleName() +
+ " -appSize numberOfApplications" +
+ " -appAttemptSize numberOfApplicationAttempts" +
+ " [-hostPort Host:Port]" +
+ " [-workingZnode rootZnodeForTesting]\n";
+
+ private YarnConfiguration conf = null;
+ private String workingZnode = "/Test";
+ private ZKRMStateStore store;
+ private AMRMTokenSecretManager appTokenMgr;
+ private ClientToAMTokenSecretManagerInRM clientToAMTokenMgr;
+
+ @Before
+ public void setUpZKServer() throws Exception {
+ super.setUp();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (store != null) {
+ store.stop();
+ }
+ if (appTokenMgr != null) {
+ appTokenMgr.stop();
+ }
+ super.tearDown();
+ }
+
+ private void initStore(String hostPort) {
+ Optional optHostPort = Optional.fromNullable(hostPort);
+ RMContext rmContext = mock(RMContext.class);
+
+ conf = new YarnConfiguration();
+ conf.set(YarnConfiguration.RM_ZK_ADDRESS, optHostPort.or(this.hostPort));
+ conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
+
+ store = new ZKRMStateStore();
+ store.init(conf);
+ store.start();
+ when(rmContext.getStateStore()).thenReturn(store);
+ appTokenMgr = new AMRMTokenSecretManager(conf, rmContext);
+ appTokenMgr.start();
+ clientToAMTokenMgr = new ClientToAMTokenSecretManagerInRM();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public int run(String[] args) {
+ LOG.info("Starting ZKRMStateStorePerf ver." + version);
+
+ int numApp = ZK_PERF_NUM_APP_DEFAULT;
+ int numAppAttemptPerApp = ZK_PERF_NUM_APPATTEMPT_PER_APP;
+ String hostPort = null;
+ boolean launchLocalZK= true;
+
+ if (args.length == 0) {
+ System.err.println("Missing arguments.");
+ return -1;
+ }
+
+ for (int i = 0; i < args.length; i++) { // parse command line
+ if (args[i].equalsIgnoreCase("-appsize")) {
+ numApp = Integer.parseInt(args[++i]);
+ } else if (args[i].equalsIgnoreCase("-appattemptsize")) {
+ numAppAttemptPerApp = Integer.parseInt(args[++i]);
+ } else if (args[i].equalsIgnoreCase("-hostPort")) {
+ hostPort = args[++i];
+ launchLocalZK = false;
+ } else if (args[i].equalsIgnoreCase("-workingZnode")) {
+ workingZnode = args[++i];
+ } else {
+ System.err.println("Illegal argument: " + args[i]);
+ return -1;
+ }
+ }
+
+ if (launchLocalZK) {
+ try {
+ setUp();
+ } catch (Exception e) {
+ System.err.println("failed to setup. : " + e.getMessage());
+ return -1;
+ }
+ }
+
+ initStore(hostPort);
+
+ long submitTime = System.currentTimeMillis();
+ long startTime = System.currentTimeMillis() + 1234;
+
+ ArrayList applicationIds = new ArrayList<>();
+ ArrayList rmApps = new ArrayList<>();
+ ArrayList attemptIds = new ArrayList<>();
+ HashMap> appIdsToAttemptId =
+ new HashMap<>();
+ TestDispatcher dispatcher = new TestDispatcher();
+ store.setRMDispatcher(dispatcher);
+
+ for (int i = 0; i < numApp; i++) {
+ ApplicationId appId = ApplicationId.newInstance(clusterTimeStamp, i);
+ applicationIds.add(appId);
+ ArrayList attemptIdsForThisApp =
+ new ArrayList<>();
+ for (int j = 0; j < numAppAttemptPerApp; j++) {
+ ApplicationAttemptId attemptId =
+ ApplicationAttemptId.newInstance(appId, j);
+ attemptIdsForThisApp.add(attemptId);
+ }
+ appIdsToAttemptId.put(appId, new LinkedHashSet(attemptIdsForThisApp));
+ attemptIds.addAll(attemptIdsForThisApp);
+ }
+
+ for (ApplicationId appId : applicationIds) {
+ RMApp app = null;
+ try {
+ app = storeApp(store, appId, submitTime, startTime);
+ } catch (Exception e) {
+ System.err.println("failed to create Application Znode. : "
+ + e.getMessage());
+ return -1;
+ }
+ waitNotify(dispatcher);
+ rmApps.add(app);
+ }
+
+ for (ApplicationAttemptId attemptId : attemptIds) {
+ Token tokenId =
+ generateAMRMToken(attemptId, appTokenMgr);
+ SecretKey clientTokenKey =
+ clientToAMTokenMgr.createMasterKey(attemptId);
+ try {
+ storeAttempt(store, attemptId,
+ ContainerId.newContainerId(attemptId, 0L).toString(),
+ tokenId, clientTokenKey, dispatcher);
+ } catch (Exception e) {
+ System.err.println("failed to create AppAttempt Znode. : "
+ + e.getMessage());
+ return -1;
+ }
+ }
+
+ long storeStart = System.currentTimeMillis();
+ try {
+ store.loadState();
+ } catch (Exception e) {
+ System.err.println("failed to locaState from ZKRMStateStore. : "
+ + e.getMessage());
+ return -1;
+ }
+ long storeEnd = System.currentTimeMillis();
+
+ long loadTime = storeEnd - storeStart;
+
+ String resultMsg = "ZKRMStateStore takes " + loadTime + " msec to loadState.";
+ LOG.info(resultMsg);
+ System.out.println(resultMsg);
+
+ // cleanup
+ try {
+ for (RMApp app : rmApps) {
+ ApplicationStateData appState =
+ ApplicationStateData.newInstance(app.getSubmitTime(),
+ app.getStartTime(), app.getApplicationSubmissionContext(),
+ app.getUser());
+ ApplicationId appId = app.getApplicationId();
+ Map m = mock(Map.class);
+ when(m.keySet()).thenReturn(appIdsToAttemptId.get(appId));
+ appState.attempts = m;
+ store.removeApplicationStateInternal(appState);
+ }
+ } catch (Exception e) {
+ System.err.println("failed to cleanup. : " + e.getMessage());
+ return -1;
+ }
+
+ return 0;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ // currently this function is just ignored
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Test
+ public void perfZKRMStateStore() throws Exception {
+ String[] args = {
+ "-appSize", String.valueOf(ZK_PERF_NUM_APP_DEFAULT),
+ "-appAttemptSize", String.valueOf(ZK_PERF_NUM_APPATTEMPT_PER_APP)
+ };
+ run(args);
+ }
+
+ static public void main(String[] args) throws Exception {
+ TestZKRMStateStorePerf perf = new TestZKRMStateStorePerf();
+
+ int res = -1;
+ try {
+ res = ToolRunner.run(perf, args);
+ } catch(Exception e) {
+ System.err.print(StringUtils.stringifyException(e));
+ res = -2;
+ }
+ if(res == -1) {
+ System.err.print(USAGE);
+ }
+ System.exit(res);
+ }
+}