YARN-1514. Utility to benchmark ZKRMStateStore#loadState for RM HA. Contributed by Tsuyoshi OZAWA
This commit is contained in:
parent
9a3e292087
commit
1c03376300
@ -828,7 +828,6 @@
|
||||
<artifactId>zookeeper</artifactId>
|
||||
<version>${zookeeper.version}</version>
|
||||
<type>test-jar</type>
|
||||
<scope>test</scope>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>org.jboss.netty</groupId>
|
||||
|
@ -301,6 +301,9 @@ Release 2.7.0 - UNRELEASED
|
||||
YARN-1299. Improve a log message in AppSchedulingInfo by adding application
|
||||
id. (Ashutosh Jindal and Devaraj K via ozawa)
|
||||
|
||||
YARN-1514. Utility to benchmark ZKRMStateStore#loadState for RM HA.
|
||||
(Tsuyoshi OZAWA via jianhe)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
YARN-2990. FairScheduler's delay-scheduling always waits for node-local and
|
||||
|
@ -186,7 +186,6 @@
|
||||
<groupId>org.apache.zookeeper</groupId>
|
||||
<artifactId>zookeeper</artifactId>
|
||||
<type>test-jar</type>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
|
||||
<dependency>
|
||||
@ -245,6 +244,13 @@
|
||||
<goal>test-jar</goal>
|
||||
</goals>
|
||||
<phase>test-compile</phase>
|
||||
<configuration>
|
||||
<archive>
|
||||
<manifest>
|
||||
<mainClass>org.apache.hadoop.test.YarnTestDriver</mainClass>
|
||||
</manifest>
|
||||
</archive>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
|
@ -0,0 +1,60 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.test;
|
||||
|
||||
import org.apache.hadoop.util.ProgramDriver;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.TestZKRMStateStorePerf;
|
||||
|
||||
/**
|
||||
* Driver for Yarn tests.
|
||||
*
|
||||
*/
|
||||
public class YarnTestDriver {
|
||||
|
||||
private ProgramDriver pgd;
|
||||
|
||||
public YarnTestDriver() {
|
||||
this(new ProgramDriver());
|
||||
}
|
||||
|
||||
public YarnTestDriver(ProgramDriver pgd) {
|
||||
this.pgd = pgd;
|
||||
try {
|
||||
pgd.addClass(TestZKRMStateStorePerf.class.getSimpleName(),
|
||||
TestZKRMStateStorePerf.class,
|
||||
"ZKRMStateStore i/o benchmark.");
|
||||
} catch(Throwable e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
public void run(String argv[]) {
|
||||
int exitCode = -1;
|
||||
try {
|
||||
exitCode = pgd.run(argv);
|
||||
} catch(Throwable e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
System.exit(exitCode);
|
||||
}
|
||||
|
||||
public static void main(String argv[]){
|
||||
new YarnTestDriver().run(argv);
|
||||
}
|
||||
}
|
@ -18,6 +18,7 @@
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
|
||||
|
||||
import org.apache.hadoop.yarn.event.Event;
|
||||
import static org.junit.Assert.assertArrayEquals;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
@ -79,8 +80,7 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
|
||||
|
||||
public static final Log LOG = LogFactory.getLog(RMStateStoreTestBase.class);
|
||||
|
||||
static class TestDispatcher implements
|
||||
Dispatcher, EventHandler<RMAppAttemptEvent> {
|
||||
static class TestDispatcher implements Dispatcher, EventHandler<Event> {
|
||||
|
||||
ApplicationAttemptId attemptId;
|
||||
|
||||
@ -93,8 +93,11 @@ public void register(Class<? extends Enum> eventType,
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handle(RMAppAttemptEvent event) {
|
||||
assertEquals(attemptId, event.getApplicationAttemptId());
|
||||
public void handle(Event event) {
|
||||
if (event instanceof RMAppAttemptEvent) {
|
||||
RMAppAttemptEvent rmAppAttemptEvent = (RMAppAttemptEvent) event;
|
||||
assertEquals(attemptId, rmAppAttemptEvent.getApplicationAttemptId());
|
||||
}
|
||||
notified = true;
|
||||
synchronized (this) {
|
||||
notifyAll();
|
||||
@ -134,7 +137,8 @@ void waitNotify(TestDispatcher dispatcher) {
|
||||
dispatcher.notified = false;
|
||||
}
|
||||
|
||||
RMApp storeApp(RMStateStore store, ApplicationId appId, long submitTime,
|
||||
protected RMApp storeApp(RMStateStore store, ApplicationId appId,
|
||||
long submitTime,
|
||||
long startTime) throws Exception {
|
||||
ApplicationSubmissionContext context =
|
||||
new ApplicationSubmissionContextPBImpl();
|
||||
@ -150,7 +154,8 @@ RMApp storeApp(RMStateStore store, ApplicationId appId, long submitTime,
|
||||
return mockApp;
|
||||
}
|
||||
|
||||
ContainerId storeAttempt(RMStateStore store, ApplicationAttemptId attemptId,
|
||||
protected ContainerId storeAttempt(RMStateStore store,
|
||||
ApplicationAttemptId attemptId,
|
||||
String containerIdStr, Token<AMRMTokenIdentifier> appToken,
|
||||
SecretKey clientTokenMasterKey, TestDispatcher dispatcher)
|
||||
throws Exception {
|
||||
@ -474,7 +479,7 @@ public void testRMDTSecretManagerStateStore(
|
||||
|
||||
}
|
||||
|
||||
private Token<AMRMTokenIdentifier> generateAMRMToken(
|
||||
protected Token<AMRMTokenIdentifier> generateAMRMToken(
|
||||
ApplicationAttemptId attemptId,
|
||||
AMRMTokenSecretManager appTokenMgr) {
|
||||
Token<AMRMTokenIdentifier> appToken =
|
||||
|
@ -0,0 +1,277 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
|
||||
|
||||
import com.google.common.base.Optional;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import javax.crypto.SecretKey;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.util.Tool;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.junit.Before;
|
||||
import org.junit.After;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class TestZKRMStateStorePerf extends RMStateStoreTestBase
|
||||
implements Tool {
|
||||
public static final Log LOG = LogFactory.getLog(TestZKRMStateStore.class);
|
||||
|
||||
final String version = "0.1";
|
||||
|
||||
// Configurable variables for performance test
|
||||
private int ZK_PERF_NUM_APP_DEFAULT = 1000;
|
||||
private int ZK_PERF_NUM_APPATTEMPT_PER_APP = 10;
|
||||
|
||||
private final long clusterTimeStamp = 1352994193343L;
|
||||
|
||||
private static final String USAGE =
|
||||
"Usage: " + TestZKRMStateStorePerf.class.getSimpleName() +
|
||||
" -appSize numberOfApplications" +
|
||||
" -appAttemptSize numberOfApplicationAttempts" +
|
||||
" [-hostPort Host:Port]" +
|
||||
" [-workingZnode rootZnodeForTesting]\n";
|
||||
|
||||
private YarnConfiguration conf = null;
|
||||
private String workingZnode = "/Test";
|
||||
private ZKRMStateStore store;
|
||||
private AMRMTokenSecretManager appTokenMgr;
|
||||
private ClientToAMTokenSecretManagerInRM clientToAMTokenMgr;
|
||||
|
||||
@Before
|
||||
public void setUpZKServer() throws Exception {
|
||||
super.setUp();
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
if (store != null) {
|
||||
store.stop();
|
||||
}
|
||||
if (appTokenMgr != null) {
|
||||
appTokenMgr.stop();
|
||||
}
|
||||
super.tearDown();
|
||||
}
|
||||
|
||||
private void initStore(String hostPort) {
|
||||
Optional<String> optHostPort = Optional.fromNullable(hostPort);
|
||||
RMContext rmContext = mock(RMContext.class);
|
||||
|
||||
conf = new YarnConfiguration();
|
||||
conf.set(YarnConfiguration.RM_ZK_ADDRESS, optHostPort.or(this.hostPort));
|
||||
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
|
||||
|
||||
store = new ZKRMStateStore();
|
||||
store.init(conf);
|
||||
store.start();
|
||||
when(rmContext.getStateStore()).thenReturn(store);
|
||||
appTokenMgr = new AMRMTokenSecretManager(conf, rmContext);
|
||||
appTokenMgr.start();
|
||||
clientToAMTokenMgr = new ClientToAMTokenSecretManagerInRM();
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public int run(String[] args) {
|
||||
LOG.info("Starting ZKRMStateStorePerf ver." + version);
|
||||
|
||||
int numApp = ZK_PERF_NUM_APP_DEFAULT;
|
||||
int numAppAttemptPerApp = ZK_PERF_NUM_APPATTEMPT_PER_APP;
|
||||
String hostPort = null;
|
||||
boolean launchLocalZK= true;
|
||||
|
||||
if (args.length == 0) {
|
||||
System.err.println("Missing arguments.");
|
||||
return -1;
|
||||
}
|
||||
|
||||
for (int i = 0; i < args.length; i++) { // parse command line
|
||||
if (args[i].equalsIgnoreCase("-appsize")) {
|
||||
numApp = Integer.parseInt(args[++i]);
|
||||
} else if (args[i].equalsIgnoreCase("-appattemptsize")) {
|
||||
numAppAttemptPerApp = Integer.parseInt(args[++i]);
|
||||
} else if (args[i].equalsIgnoreCase("-hostPort")) {
|
||||
hostPort = args[++i];
|
||||
launchLocalZK = false;
|
||||
} else if (args[i].equalsIgnoreCase("-workingZnode")) {
|
||||
workingZnode = args[++i];
|
||||
} else {
|
||||
System.err.println("Illegal argument: " + args[i]);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
if (launchLocalZK) {
|
||||
try {
|
||||
setUp();
|
||||
} catch (Exception e) {
|
||||
System.err.println("failed to setup. : " + e.getMessage());
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
initStore(hostPort);
|
||||
|
||||
long submitTime = System.currentTimeMillis();
|
||||
long startTime = System.currentTimeMillis() + 1234;
|
||||
|
||||
ArrayList<ApplicationId> applicationIds = new ArrayList<>();
|
||||
ArrayList<RMApp> rmApps = new ArrayList<>();
|
||||
ArrayList<ApplicationAttemptId> attemptIds = new ArrayList<>();
|
||||
HashMap<ApplicationId, Set<ApplicationAttemptId>> appIdsToAttemptId =
|
||||
new HashMap<>();
|
||||
TestDispatcher dispatcher = new TestDispatcher();
|
||||
store.setRMDispatcher(dispatcher);
|
||||
|
||||
for (int i = 0; i < numApp; i++) {
|
||||
ApplicationId appId = ApplicationId.newInstance(clusterTimeStamp, i);
|
||||
applicationIds.add(appId);
|
||||
ArrayList<ApplicationAttemptId> attemptIdsForThisApp =
|
||||
new ArrayList<>();
|
||||
for (int j = 0; j < numAppAttemptPerApp; j++) {
|
||||
ApplicationAttemptId attemptId =
|
||||
ApplicationAttemptId.newInstance(appId, j);
|
||||
attemptIdsForThisApp.add(attemptId);
|
||||
}
|
||||
appIdsToAttemptId.put(appId, new LinkedHashSet(attemptIdsForThisApp));
|
||||
attemptIds.addAll(attemptIdsForThisApp);
|
||||
}
|
||||
|
||||
for (ApplicationId appId : applicationIds) {
|
||||
RMApp app = null;
|
||||
try {
|
||||
app = storeApp(store, appId, submitTime, startTime);
|
||||
} catch (Exception e) {
|
||||
System.err.println("failed to create Application Znode. : "
|
||||
+ e.getMessage());
|
||||
return -1;
|
||||
}
|
||||
waitNotify(dispatcher);
|
||||
rmApps.add(app);
|
||||
}
|
||||
|
||||
for (ApplicationAttemptId attemptId : attemptIds) {
|
||||
Token<AMRMTokenIdentifier> tokenId =
|
||||
generateAMRMToken(attemptId, appTokenMgr);
|
||||
SecretKey clientTokenKey =
|
||||
clientToAMTokenMgr.createMasterKey(attemptId);
|
||||
try {
|
||||
storeAttempt(store, attemptId,
|
||||
ContainerId.newContainerId(attemptId, 0L).toString(),
|
||||
tokenId, clientTokenKey, dispatcher);
|
||||
} catch (Exception e) {
|
||||
System.err.println("failed to create AppAttempt Znode. : "
|
||||
+ e.getMessage());
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
long storeStart = System.currentTimeMillis();
|
||||
try {
|
||||
store.loadState();
|
||||
} catch (Exception e) {
|
||||
System.err.println("failed to locaState from ZKRMStateStore. : "
|
||||
+ e.getMessage());
|
||||
return -1;
|
||||
}
|
||||
long storeEnd = System.currentTimeMillis();
|
||||
|
||||
long loadTime = storeEnd - storeStart;
|
||||
|
||||
String resultMsg = "ZKRMStateStore takes " + loadTime + " msec to loadState.";
|
||||
LOG.info(resultMsg);
|
||||
System.out.println(resultMsg);
|
||||
|
||||
// cleanup
|
||||
try {
|
||||
for (RMApp app : rmApps) {
|
||||
ApplicationStateData appState =
|
||||
ApplicationStateData.newInstance(app.getSubmitTime(),
|
||||
app.getStartTime(), app.getApplicationSubmissionContext(),
|
||||
app.getUser());
|
||||
ApplicationId appId = app.getApplicationId();
|
||||
Map m = mock(Map.class);
|
||||
when(m.keySet()).thenReturn(appIdsToAttemptId.get(appId));
|
||||
appState.attempts = m;
|
||||
store.removeApplicationStateInternal(appState);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
System.err.println("failed to cleanup. : " + e.getMessage());
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setConf(Configuration conf) {
|
||||
// currently this function is just ignored
|
||||
}
|
||||
|
||||
@Override
|
||||
public Configuration getConf() {
|
||||
return conf;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void perfZKRMStateStore() throws Exception {
|
||||
String[] args = {
|
||||
"-appSize", String.valueOf(ZK_PERF_NUM_APP_DEFAULT),
|
||||
"-appAttemptSize", String.valueOf(ZK_PERF_NUM_APPATTEMPT_PER_APP)
|
||||
};
|
||||
run(args);
|
||||
}
|
||||
|
||||
static public void main(String[] args) throws Exception {
|
||||
TestZKRMStateStorePerf perf = new TestZKRMStateStorePerf();
|
||||
|
||||
int res = -1;
|
||||
try {
|
||||
res = ToolRunner.run(perf, args);
|
||||
} catch(Exception e) {
|
||||
System.err.print(StringUtils.stringifyException(e));
|
||||
res = -2;
|
||||
}
|
||||
if(res == -1) {
|
||||
System.err.print(USAGE);
|
||||
}
|
||||
System.exit(res);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user