HDDS-1295. Add MiniOzoneChaosCluster to mimic long running workload in a unit test environment. Contributed by Mukul Kumar Singh.

This commit is contained in:
Shashikant Banerjee 2019-04-01 15:50:03 +05:30
parent 53a86e2b8e
commit 509f31b109
4 changed files with 501 additions and 1 deletions

View File

@ -0,0 +1,224 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone;
import org.apache.commons.lang3.RandomUtils;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.log4j.Level;
import org.apache.ratis.grpc.client.GrpcClientProtocolClient;
import org.apache.ratis.util.LogUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.Executors;
/**
* This class causes random failures in the chaos cluster.
*/
public class MiniOzoneChaosCluster extends MiniOzoneClusterImpl {
static final Logger LOG =
LoggerFactory.getLogger(MiniOzoneChaosCluster.class);
private final int numDatanodes;
private final ScheduledExecutorService executorService;
private ScheduledFuture scheduledFuture;
private enum FailureMode {
NODES
}
public MiniOzoneChaosCluster(OzoneConfiguration conf,
OzoneManager ozoneManager,
StorageContainerManager scm,
List<HddsDatanodeService> hddsDatanodes) {
super(conf, ozoneManager, scm, hddsDatanodes);
this.executorService = Executors.newSingleThreadScheduledExecutor();
this.numDatanodes = getHddsDatanodes().size();
LogUtils.setLogLevel(GrpcClientProtocolClient.LOG, Level.WARN);
}
// Get the number of datanodes to fail in the cluster.
private int getNumberOfNodesToFail() {
return RandomUtils.nextBoolean() ? 1 : 2;
}
// Should the failed node wait for SCM to register the even before
// restart, i.e fast restart or not.
private boolean isFastRestart() {
return RandomUtils.nextBoolean();
}
// Get the datanode index of the datanode to fail.
private int getNodeToFail() {
return RandomUtils.nextInt() % numDatanodes;
}
private void failNodes() {
for (int i = 0; i < getNumberOfNodesToFail(); i++) {
boolean failureMode = isFastRestart();
int failedNodeIndex = getNodeToFail();
try {
restartHddsDatanode(failedNodeIndex, failureMode);
} catch (Exception e) {
}
}
}
private FailureMode getFailureMode() {
return FailureMode.
values()[RandomUtils.nextInt() % FailureMode.values().length];
}
// Fail nodes randomly at configured timeout period.
private void fail() {
FailureMode mode = getFailureMode();
switch (mode) {
case NODES:
failNodes();
break;
default:
LOG.error("invalid failure mode:{}", mode);
break;
}
}
void startChaos(long initialDelay, long period, TimeUnit timeUnit) {
scheduledFuture = executorService.scheduleAtFixedRate(this::fail,
initialDelay, period, timeUnit);
}
void stopChaos() throws Exception {
scheduledFuture.cancel(false);
scheduledFuture.get();
}
public void shutdown() {
super.shutdown();
try {
stopChaos();
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.DAYS);
} catch (Exception e) {
LOG.error("failed to shutdown MiniOzoneChaosCluster", e);
}
}
/**
* Builder for configuring the MiniOzoneChaosCluster to run.
*/
public static class Builder extends MiniOzoneClusterImpl.Builder {
/**
* Creates a new Builder.
*
* @param conf configuration
*/
public Builder(OzoneConfiguration conf) {
super(conf);
}
/**
* Sets the number of HddsDatanodes to be started as part of
* MiniOzoneChaosCluster.
*
* @param val number of datanodes
*
* @return MiniOzoneChaosCluster.Builder
*/
public Builder setNumDatanodes(int val) {
super.setNumDatanodes(val);
return this;
}
@Override
void initializeConfiguration() throws IOException {
super.initializeConfiguration();
conf.setStorageSize(ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY,
2, StorageUnit.KB);
conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE,
16, StorageUnit.KB);
conf.setStorageSize(OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_SIZE,
4, StorageUnit.KB);
conf.setStorageSize(OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_MAX_SIZE,
8, StorageUnit.KB);
conf.setStorageSize(ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
1, StorageUnit.MB);
conf.setTimeDuration(ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT, 1000,
TimeUnit.MILLISECONDS);
conf.setTimeDuration(ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL, 5,
TimeUnit.SECONDS);
conf.setTimeDuration(HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL, 1,
TimeUnit.SECONDS);
conf.setTimeDuration(HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL, 1,
TimeUnit.SECONDS);
conf.setTimeDuration(
ScmConfigKeys.OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT, 5,
TimeUnit.SECONDS);
conf.setTimeDuration(ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL,
1, TimeUnit.SECONDS);
conf.setTimeDuration(HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL, 1,
TimeUnit.SECONDS);
}
@Override
public MiniOzoneChaosCluster build() throws IOException {
DefaultMetricsSystem.setMiniClusterMode(true);
initializeConfiguration();
StorageContainerManager scm;
OzoneManager om;
try {
scm = createSCM();
scm.start();
om = createOM();
if(certClient != null) {
om.setCertClient(certClient);
}
} catch (AuthenticationException ex) {
throw new IOException("Unable to build MiniOzoneCluster. ", ex);
}
om.start();
final List<HddsDatanodeService> hddsDatanodes = createHddsDatanodes(scm);
MiniOzoneChaosCluster cluster =
new MiniOzoneChaosCluster(conf, om, scm, hddsDatanodes);
if (startDataNodes) {
cluster.startHddsDatanodes();
}
return cluster;
}
}
}

View File

@ -510,7 +510,7 @@ void initializeOmStorage(OMStorage omStorage) throws IOException{
*
* @throws IOException
*/
private OzoneManager createOM()
OzoneManager createOM()
throws IOException, AuthenticationException {
configureOM();
OMStorage omStore = new OMStorage(conf);

View File

@ -0,0 +1,160 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone;
import org.apache.commons.lang3.RandomUtils;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* A Simple Load generator for testing.
*/
public class MiniOzoneLoadGenerator {
static final Logger LOG =
LoggerFactory.getLogger(MiniOzoneLoadGenerator.class);
private ThreadPoolExecutor writeExecutor;
private int numWriteThreads;
// number of buffer to be allocated, each is allocated with length which
// is multiple of 2, each buffer is populated with random data.
private int numBuffers;
private List<ByteBuffer> buffers;
private AtomicBoolean isWriteThreadRunning;
private final OzoneBucket ozoneBucket;
MiniOzoneLoadGenerator(OzoneBucket bucket, int numThreads, int numBuffers) {
this.ozoneBucket = bucket;
this.numWriteThreads = numThreads;
this.numBuffers = numBuffers;
this.writeExecutor = new ThreadPoolExecutor(numThreads, numThreads, 100,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(1024),
new ThreadPoolExecutor.CallerRunsPolicy());
this.writeExecutor.prestartAllCoreThreads();
this.isWriteThreadRunning = new AtomicBoolean(false);
// allocate buffers and populate random data.
buffers = new ArrayList<>();
for (int i = 0; i < numBuffers; i++) {
int size = (int) StorageUnit.KB.toBytes(1 << i);
ByteBuffer buffer = ByteBuffer.allocate(size);
buffer.put(RandomUtils.nextBytes(size));
buffers.add(buffer);
}
}
// Start IO load on an Ozone bucket.
private void load(long runTimeMillis) {
LOG.info("Started IO Thread" + Thread.currentThread().getId());
String threadName = Thread.currentThread().getName();
long startTime = Time.monotonicNow();
while (isWriteThreadRunning.get() &&
(Time.monotonicNow() < startTime + runTimeMillis)) {
// choose a random buffer.
int index = RandomUtils.nextInt();
ByteBuffer buffer = buffers.get(index % numBuffers);
int bufferCapacity = buffer.capacity();
String keyName = threadName + "-" + index;
try (OzoneOutputStream stream = ozoneBucket.createKey(keyName,
bufferCapacity, ReplicationType.RATIS, ReplicationFactor.THREE,
new HashMap<>())) {
stream.write(buffer.array());
} catch (Exception e) {
LOG.error("LOADGEN: Create key:{} failed with exception", keyName, e);
break;
}
try (OzoneInputStream stream = ozoneBucket.readKey(keyName)) {
byte[] readBuffer = new byte[bufferCapacity];
int readLen = stream.read(readBuffer);
if (readLen < bufferCapacity) {
LOG.error("LOADGEN: Read mismatch, key:{} read data length:{} is " +
"smaller than excepted:{}", keyName, readLen, bufferCapacity);
break;
}
if (!Arrays.equals(readBuffer, buffer.array())) {
LOG.error("LOADGEN: Read mismatch, key:{} Read data does not match " +
"the written data", keyName);
break;
}
} catch (Exception e) {
LOG.error("Read key:{} failed with exception", keyName, e);
break;
}
}
// This will terminate other threads too.
isWriteThreadRunning.set(false);
}
public void startIO(long time, TimeUnit timeUnit) {
List<CompletableFuture<Void>> writeFutures = new ArrayList<>();
if (isWriteThreadRunning.compareAndSet(false, true)) {
// Start the IO thread
for (int i = 0; i < numWriteThreads; i++) {
writeFutures.add(
CompletableFuture.runAsync(() -> load(timeUnit.toMillis(time)),
writeExecutor));
}
// Wait for IO to complete
for (CompletableFuture<Void> f : writeFutures) {
try {
f.get();
} catch (Throwable t) {
LOG.error("startIO failed with exception", t);
}
}
}
}
public void shutdownLoadGenerator() {
try {
writeExecutor.shutdown();
writeExecutor.awaitTermination(1, TimeUnit.DAYS);
} catch (Exception e) {
LOG.error("error while closing ", e);
}
}
}

View File

@ -0,0 +1,116 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.client.ObjectStore;
import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.ozone.client.OzoneVolume;
import org.junit.BeforeClass;
import org.junit.AfterClass;
import org.junit.Test;
import picocli.CommandLine.Command;
import picocli.CommandLine.Option;
import picocli.CommandLine;
import java.util.concurrent.TimeUnit;
/**
* Test Read Write with Mini Ozone Chaos Cluster.
*/
@Command(description = "Starts IO with MiniOzoneChaosCluster",
name = "chaos", mixinStandardHelpOptions = true)
public class TestMiniChaosOzoneCluster implements Runnable {
@Option(names = {"-d", "--numDatanodes"},
description = "num of datanodes")
private static int numDatanodes = 20;
@Option(names = {"-t", "--numThreads"},
description = "num of IO threads")
private static int numThreads = 10;
@Option(names = {"-b", "--numBuffers"},
description = "num of IO buffers")
private static int numBuffers = 16;
@Option(names = {"-m", "--numMinutes"},
description = "total run time")
private static int numMinutes = 1440; // 1 day by default
@Option(names = {"-i", "--failureInterval"},
description = "time between failure events in seconds")
private static int failureInterval = 5; // 5 second period between failures.
private static MiniOzoneChaosCluster cluster;
private static MiniOzoneLoadGenerator loadGenerator;
@BeforeClass
public static void init() throws Exception {
cluster = new MiniOzoneChaosCluster.Builder(new OzoneConfiguration())
.setNumDatanodes(numDatanodes).build();
cluster.waitForClusterToBeReady();
String volumeName = RandomStringUtils.randomAlphabetic(10).toLowerCase();
String bucketName = RandomStringUtils.randomAlphabetic(10).toLowerCase();
ObjectStore store = cluster.getRpcClient().getObjectStore();
store.createVolume(volumeName);
OzoneVolume volume = store.getVolume(volumeName);
volume.createBucket(bucketName);
OzoneBucket ozoneBucket = volume.getBucket(bucketName);
loadGenerator =
new MiniOzoneLoadGenerator(ozoneBucket, numThreads, numBuffers);
}
/**
* Shutdown MiniDFSCluster.
*/
@AfterClass
public static void shutdown() {
if (loadGenerator != null) {
loadGenerator.shutdownLoadGenerator();
}
if (cluster != null) {
cluster.shutdown();
}
}
public void run() {
try {
init();
cluster.startChaos(5, failureInterval, TimeUnit.SECONDS);
loadGenerator.startIO(numMinutes, TimeUnit.MINUTES);
} catch (Exception e) {
} finally {
shutdown();
}
}
public static void main(String... args) {
CommandLine.run(new TestMiniChaosOzoneCluster(), System.err, args);
}
@Test
public void testReadWriteWithChaosCluster() throws Exception {
cluster.startChaos(5, 1, TimeUnit.SECONDS);
loadGenerator.startIO(1, TimeUnit.MINUTES);
}
}