From 509f31b10990ccb12266e8f4d3f38b7eaf7f6050 Mon Sep 17 00:00:00 2001 From: Shashikant Banerjee Date: Mon, 1 Apr 2019 15:50:03 +0530 Subject: [PATCH] HDDS-1295. Add MiniOzoneChaosCluster to mimic long running workload in a unit test environment. Contributed by Mukul Kumar Singh. --- .../hadoop/ozone/MiniOzoneChaosCluster.java | 224 ++++++++++++++++++ .../hadoop/ozone/MiniOzoneClusterImpl.java | 2 +- .../hadoop/ozone/MiniOzoneLoadGenerator.java | 160 +++++++++++++ .../ozone/TestMiniChaosOzoneCluster.java | 116 +++++++++ 4 files changed, 501 insertions(+), 1 deletion(-) create mode 100644 hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneChaosCluster.java create mode 100644 hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneLoadGenerator.java create mode 100644 hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniChaosOzoneCluster.java diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneChaosCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneChaosCluster.java new file mode 100644 index 0000000000..8e25d48180 --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneChaosCluster.java @@ -0,0 +1,224 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone; + +import org.apache.commons.lang3.RandomUtils; +import org.apache.hadoop.conf.StorageUnit; +import org.apache.hadoop.hdds.HddsConfigKeys; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.server.StorageContainerManager; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.ozone.om.OzoneManager; +import org.apache.hadoop.security.authentication.client.AuthenticationException; +import org.apache.log4j.Level; +import org.apache.ratis.grpc.client.GrpcClientProtocolClient; +import org.apache.ratis.util.LogUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; + +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.Executors; + +/** + * This class causes random failures in the chaos cluster. + */ +public class MiniOzoneChaosCluster extends MiniOzoneClusterImpl { + + static final Logger LOG = + LoggerFactory.getLogger(MiniOzoneChaosCluster.class); + + private final int numDatanodes; + private final ScheduledExecutorService executorService; + + private ScheduledFuture scheduledFuture; + + private enum FailureMode { + NODES + } + + public MiniOzoneChaosCluster(OzoneConfiguration conf, + OzoneManager ozoneManager, + StorageContainerManager scm, + List hddsDatanodes) { + super(conf, ozoneManager, scm, hddsDatanodes); + + this.executorService = Executors.newSingleThreadScheduledExecutor(); + this.numDatanodes = getHddsDatanodes().size(); + LogUtils.setLogLevel(GrpcClientProtocolClient.LOG, Level.WARN); + } + + // Get the number of datanodes to fail in the cluster. + private int getNumberOfNodesToFail() { + return RandomUtils.nextBoolean() ? 1 : 2; + } + + // Should the failed node wait for SCM to register the even before + // restart, i.e fast restart or not. + private boolean isFastRestart() { + return RandomUtils.nextBoolean(); + } + + // Get the datanode index of the datanode to fail. + private int getNodeToFail() { + return RandomUtils.nextInt() % numDatanodes; + } + + private void failNodes() { + for (int i = 0; i < getNumberOfNodesToFail(); i++) { + boolean failureMode = isFastRestart(); + int failedNodeIndex = getNodeToFail(); + try { + restartHddsDatanode(failedNodeIndex, failureMode); + } catch (Exception e) { + + } + } + } + + private FailureMode getFailureMode() { + return FailureMode. + values()[RandomUtils.nextInt() % FailureMode.values().length]; + } + + // Fail nodes randomly at configured timeout period. + private void fail() { + FailureMode mode = getFailureMode(); + switch (mode) { + case NODES: + failNodes(); + break; + + default: + LOG.error("invalid failure mode:{}", mode); + break; + } + } + + void startChaos(long initialDelay, long period, TimeUnit timeUnit) { + scheduledFuture = executorService.scheduleAtFixedRate(this::fail, + initialDelay, period, timeUnit); + } + + void stopChaos() throws Exception { + scheduledFuture.cancel(false); + scheduledFuture.get(); + } + + public void shutdown() { + super.shutdown(); + try { + stopChaos(); + executorService.shutdown(); + executorService.awaitTermination(1, TimeUnit.DAYS); + } catch (Exception e) { + LOG.error("failed to shutdown MiniOzoneChaosCluster", e); + } + } + + /** + * Builder for configuring the MiniOzoneChaosCluster to run. + */ + public static class Builder extends MiniOzoneClusterImpl.Builder { + + /** + * Creates a new Builder. + * + * @param conf configuration + */ + public Builder(OzoneConfiguration conf) { + super(conf); + } + + /** + * Sets the number of HddsDatanodes to be started as part of + * MiniOzoneChaosCluster. + * + * @param val number of datanodes + * + * @return MiniOzoneChaosCluster.Builder + */ + public Builder setNumDatanodes(int val) { + super.setNumDatanodes(val); + return this; + } + + @Override + void initializeConfiguration() throws IOException { + super.initializeConfiguration(); + conf.setStorageSize(ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY, + 2, StorageUnit.KB); + conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, + 16, StorageUnit.KB); + conf.setStorageSize(OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_SIZE, + 4, StorageUnit.KB); + conf.setStorageSize(OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_MAX_SIZE, + 8, StorageUnit.KB); + conf.setStorageSize(ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE, + 1, StorageUnit.MB); + conf.setTimeDuration(ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT, 1000, + TimeUnit.MILLISECONDS); + conf.setTimeDuration(ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL, 5, + TimeUnit.SECONDS); + conf.setTimeDuration(HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL, 1, + TimeUnit.SECONDS); + conf.setTimeDuration(HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL, 1, + TimeUnit.SECONDS); + conf.setTimeDuration( + ScmConfigKeys.OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT, 5, + TimeUnit.SECONDS); + conf.setTimeDuration(ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, + 1, TimeUnit.SECONDS); + conf.setTimeDuration(HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL, 1, + TimeUnit.SECONDS); + } + + @Override + public MiniOzoneChaosCluster build() throws IOException { + DefaultMetricsSystem.setMiniClusterMode(true); + initializeConfiguration(); + StorageContainerManager scm; + OzoneManager om; + try { + scm = createSCM(); + scm.start(); + om = createOM(); + if(certClient != null) { + om.setCertClient(certClient); + } + } catch (AuthenticationException ex) { + throw new IOException("Unable to build MiniOzoneCluster. ", ex); + } + + om.start(); + final List hddsDatanodes = createHddsDatanodes(scm); + MiniOzoneChaosCluster cluster = + new MiniOzoneChaosCluster(conf, om, scm, hddsDatanodes); + if (startDataNodes) { + cluster.startHddsDatanodes(); + } + return cluster; + } + } +} diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java index e746f3370f..5cd08419c0 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java @@ -510,7 +510,7 @@ void initializeOmStorage(OMStorage omStorage) throws IOException{ * * @throws IOException */ - private OzoneManager createOM() + OzoneManager createOM() throws IOException, AuthenticationException { configureOM(); OMStorage omStore = new OMStorage(conf); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneLoadGenerator.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneLoadGenerator.java new file mode 100644 index 0000000000..efb3b66637 --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneLoadGenerator.java @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone; + +import org.apache.commons.lang3.RandomUtils; +import org.apache.hadoop.conf.StorageUnit; +import org.apache.hadoop.hdds.client.ReplicationFactor; +import org.apache.hadoop.hdds.client.ReplicationType; +import org.apache.hadoop.ozone.client.OzoneBucket; +import org.apache.hadoop.ozone.client.io.OzoneInputStream; +import org.apache.hadoop.ozone.client.io.OzoneOutputStream; +import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * A Simple Load generator for testing. + */ +public class MiniOzoneLoadGenerator { + + static final Logger LOG = + LoggerFactory.getLogger(MiniOzoneLoadGenerator.class); + + private ThreadPoolExecutor writeExecutor; + private int numWriteThreads; + // number of buffer to be allocated, each is allocated with length which + // is multiple of 2, each buffer is populated with random data. + private int numBuffers; + private List buffers; + + private AtomicBoolean isWriteThreadRunning; + + private final OzoneBucket ozoneBucket; + + MiniOzoneLoadGenerator(OzoneBucket bucket, int numThreads, int numBuffers) { + this.ozoneBucket = bucket; + this.numWriteThreads = numThreads; + this.numBuffers = numBuffers; + this.writeExecutor = new ThreadPoolExecutor(numThreads, numThreads, 100, + TimeUnit.SECONDS, new ArrayBlockingQueue<>(1024), + new ThreadPoolExecutor.CallerRunsPolicy()); + this.writeExecutor.prestartAllCoreThreads(); + + this.isWriteThreadRunning = new AtomicBoolean(false); + + // allocate buffers and populate random data. + buffers = new ArrayList<>(); + for (int i = 0; i < numBuffers; i++) { + int size = (int) StorageUnit.KB.toBytes(1 << i); + ByteBuffer buffer = ByteBuffer.allocate(size); + buffer.put(RandomUtils.nextBytes(size)); + buffers.add(buffer); + } + } + + // Start IO load on an Ozone bucket. + private void load(long runTimeMillis) { + LOG.info("Started IO Thread" + Thread.currentThread().getId()); + String threadName = Thread.currentThread().getName(); + long startTime = Time.monotonicNow(); + + while (isWriteThreadRunning.get() && + (Time.monotonicNow() < startTime + runTimeMillis)) { + // choose a random buffer. + int index = RandomUtils.nextInt(); + ByteBuffer buffer = buffers.get(index % numBuffers); + int bufferCapacity = buffer.capacity(); + + String keyName = threadName + "-" + index; + try (OzoneOutputStream stream = ozoneBucket.createKey(keyName, + bufferCapacity, ReplicationType.RATIS, ReplicationFactor.THREE, + new HashMap<>())) { + stream.write(buffer.array()); + } catch (Exception e) { + LOG.error("LOADGEN: Create key:{} failed with exception", keyName, e); + break; + } + + try (OzoneInputStream stream = ozoneBucket.readKey(keyName)) { + byte[] readBuffer = new byte[bufferCapacity]; + int readLen = stream.read(readBuffer); + + if (readLen < bufferCapacity) { + LOG.error("LOADGEN: Read mismatch, key:{} read data length:{} is " + + "smaller than excepted:{}", keyName, readLen, bufferCapacity); + break; + } + + if (!Arrays.equals(readBuffer, buffer.array())) { + LOG.error("LOADGEN: Read mismatch, key:{} Read data does not match " + + "the written data", keyName); + break; + } + + } catch (Exception e) { + LOG.error("Read key:{} failed with exception", keyName, e); + break; + } + + } + // This will terminate other threads too. + isWriteThreadRunning.set(false); + } + + public void startIO(long time, TimeUnit timeUnit) { + List> writeFutures = new ArrayList<>(); + if (isWriteThreadRunning.compareAndSet(false, true)) { + // Start the IO thread + for (int i = 0; i < numWriteThreads; i++) { + writeFutures.add( + CompletableFuture.runAsync(() -> load(timeUnit.toMillis(time)), + writeExecutor)); + } + + // Wait for IO to complete + for (CompletableFuture f : writeFutures) { + try { + f.get(); + } catch (Throwable t) { + LOG.error("startIO failed with exception", t); + } + } + } + } + + public void shutdownLoadGenerator() { + try { + writeExecutor.shutdown(); + writeExecutor.awaitTermination(1, TimeUnit.DAYS); + } catch (Exception e) { + LOG.error("error while closing ", e); + } + } +} diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniChaosOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniChaosOzoneCluster.java new file mode 100644 index 0000000000..04383519bc --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniChaosOzoneCluster.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone; + +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.client.ObjectStore; +import org.apache.hadoop.ozone.client.OzoneBucket; +import org.apache.hadoop.ozone.client.OzoneVolume; +import org.junit.BeforeClass; +import org.junit.AfterClass; +import org.junit.Test; +import picocli.CommandLine.Command; +import picocli.CommandLine.Option; +import picocli.CommandLine; + + +import java.util.concurrent.TimeUnit; + +/** + * Test Read Write with Mini Ozone Chaos Cluster. + */ +@Command(description = "Starts IO with MiniOzoneChaosCluster", + name = "chaos", mixinStandardHelpOptions = true) +public class TestMiniChaosOzoneCluster implements Runnable { + + @Option(names = {"-d", "--numDatanodes"}, + description = "num of datanodes") + private static int numDatanodes = 20; + + @Option(names = {"-t", "--numThreads"}, + description = "num of IO threads") + private static int numThreads = 10; + + @Option(names = {"-b", "--numBuffers"}, + description = "num of IO buffers") + private static int numBuffers = 16; + + @Option(names = {"-m", "--numMinutes"}, + description = "total run time") + private static int numMinutes = 1440; // 1 day by default + + @Option(names = {"-i", "--failureInterval"}, + description = "time between failure events in seconds") + private static int failureInterval = 5; // 5 second period between failures. + + private static MiniOzoneChaosCluster cluster; + private static MiniOzoneLoadGenerator loadGenerator; + + @BeforeClass + public static void init() throws Exception { + cluster = new MiniOzoneChaosCluster.Builder(new OzoneConfiguration()) + .setNumDatanodes(numDatanodes).build(); + cluster.waitForClusterToBeReady(); + + String volumeName = RandomStringUtils.randomAlphabetic(10).toLowerCase(); + String bucketName = RandomStringUtils.randomAlphabetic(10).toLowerCase(); + ObjectStore store = cluster.getRpcClient().getObjectStore(); + store.createVolume(volumeName); + OzoneVolume volume = store.getVolume(volumeName); + volume.createBucket(bucketName); + OzoneBucket ozoneBucket = volume.getBucket(bucketName); + loadGenerator = + new MiniOzoneLoadGenerator(ozoneBucket, numThreads, numBuffers); + } + + /** + * Shutdown MiniDFSCluster. + */ + @AfterClass + public static void shutdown() { + if (loadGenerator != null) { + loadGenerator.shutdownLoadGenerator(); + } + + if (cluster != null) { + cluster.shutdown(); + } + } + + public void run() { + try { + init(); + cluster.startChaos(5, failureInterval, TimeUnit.SECONDS); + loadGenerator.startIO(numMinutes, TimeUnit.MINUTES); + } catch (Exception e) { + } finally { + shutdown(); + } + } + + public static void main(String... args) { + CommandLine.run(new TestMiniChaosOzoneCluster(), System.err, args); + } + + @Test + public void testReadWriteWithChaosCluster() throws Exception { + cluster.startChaos(5, 1, TimeUnit.SECONDS); + loadGenerator.startIO(1, TimeUnit.MINUTES); + } +}