diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/tools/Corona.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/tools/Corona.java index 971fdd6a1d..6ed334476e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/tools/Corona.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/tools/Corona.java @@ -18,7 +18,7 @@ package org.apache.hadoop.ozone.tools; - +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.cli.OptionBuilder; @@ -27,10 +27,11 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.ozone.OzoneConfiguration; import org.apache.hadoop.ozone.client.OzoneClient; import org.apache.hadoop.ozone.client.OzoneClientFactory; +import org.apache.hadoop.ozone.client.io.OzoneInputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; -import org.apache.hadoop.ozone.OzoneConfiguration; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; @@ -39,11 +40,15 @@ import java.io.IOException; import java.io.PrintStream; +import java.util.Arrays; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; /** * Corona - A tool to populate ozone with data for testing.
@@ -81,6 +86,7 @@ public final class Corona extends Configured implements Tool { private static final String HELP = "help"; private static final String MODE = "mode"; private static final String SOURCE = "source"; + private static final String VALIDATE_WRITE = "validateWrites"; private static final String NUM_OF_THREADS = "numOfThreads"; private static final String NUM_OF_VOLUMES = "numOfVolumes"; private static final String NUM_OF_BUCKETS = "numOfBuckets"; @@ -109,6 +115,8 @@ public final class Corona extends Configured implements Tool { private String numOfBuckets; private String numOfKeys; + private boolean validateWrites; + private OzoneClient ozoneClient; private ExecutorService processor; @@ -125,7 +133,14 @@ public final class Corona extends Configured implements Tool { private AtomicInteger numberOfBucketsCreated; private AtomicLong numberOfKeysAdded; - private Corona(Configuration conf) throws IOException { + private Long totalWritesValidated; + private Long writeValidationSuccessCount; + private Long writeValidationFailureCount; + + private BlockingQueue validationQueue; + + @VisibleForTesting + Corona(Configuration conf) throws IOException { startTime = System.nanoTime(); volumeCreationTime = new AtomicLong(); bucketCreationTime = new AtomicLong(); @@ -159,20 +174,35 @@ public int run(String[] args) throws Exception { LOG.info("Number of Volumes: {}.", numOfVolumes); LOG.info("Number of Buckets per Volume: {}.", numOfBuckets); LOG.info("Number of Keys per Bucket: {}.", numOfKeys); - for(int i = 0; i < Integer.parseInt(numOfVolumes); i++) { + for (int i = 0; i < Integer.parseInt(numOfVolumes); i++) { String volume = "vol-" + i + "-" + RandomStringUtils.randomNumeric(5); processor.submit(new OfflineProcessor(volume)); } - Thread progressbar = getProgressBarThread(); - LOG.info("Starting progress bar Thread."); - progressbar.start(); - processor.shutdown(); - processor.awaitTermination(Integer.MAX_VALUE, TimeUnit.MILLISECONDS); - completed = true; - progressbar.join(); - return 0; } + Thread validator = null; + if(validateWrites) { + totalWritesValidated = 0L; + writeValidationSuccessCount = 0L; + writeValidationFailureCount = 0L; + + validationQueue = + new ArrayBlockingQueue<>(Integer.parseInt(numOfThreads)); + validator = new Thread(new Validator()); + validator.start(); + LOG.info("Data validation is enabled."); + } + Thread progressbar = getProgressBarThread(); + LOG.info("Starting progress bar Thread."); + progressbar.start(); + processor.shutdown(); + processor.awaitTermination(Integer.MAX_VALUE, TimeUnit.MILLISECONDS); + completed = true; + progressbar.join(); + if(validateWrites) { + validator.join(); + } + return 0; } private Options getOzonePetaGenOptions() { @@ -193,6 +223,10 @@ private Options getOzonePetaGenOptions() { "commoncrawl warc file to be used when the mode is online."); Option optSource = OptionBuilder.create(SOURCE); + OptionBuilder.withDescription("do random validation of " + + "data written into ozone, only subset of data is validated."); + Option optValidateWrite = OptionBuilder.create(VALIDATE_WRITE); + OptionBuilder.withArgName("value"); OptionBuilder.hasArg(); OptionBuilder.withDescription("number of threads to be launched " + @@ -220,6 +254,7 @@ private Options getOzonePetaGenOptions() { options.addOption(optHelp); options.addOption(optMode); options.addOption(optSource); + options.addOption(optValidateWrite); options.addOption(optNumOfThreads); options.addOption(optNumOfVolumes); options.addOption(optNumOfBuckets); @@ -239,6 +274,8 @@ private void parseOzonePetaGenOptions(CommandLine cmdLine) { numOfThreads = cmdLine.hasOption(NUM_OF_THREADS) ? cmdLine.getOptionValue(NUM_OF_THREADS) : NUM_OF_THREADS_DEFAULT; + validateWrites = cmdLine.hasOption(VALIDATE_WRITE); + numOfVolumes = cmdLine.hasOption(NUM_OF_VOLUMES) ? cmdLine.getOptionValue(NUM_OF_VOLUMES) : NUM_OF_VOLUMES_DEFAULT; @@ -253,6 +290,9 @@ private void usage() { System.out.println("Options supported are:"); System.out.println("-numOfThreads " + "number of threads to be launched for the run."); + System.out.println("-validateWrites " + + "do random validation of data written into ozone, " + + "only subset of data is validated."); System.out.println("-mode [online | offline] " + "specifies the mode in which Corona should run."); System.out.println("-source " @@ -304,6 +344,7 @@ public void run() { RandomStringUtils.randomNumeric(5); byte[] value = DFSUtil.string2Bytes( RandomStringUtils.randomAscii(10240)); + try { LOG.trace("Adding key: {} in bucket: {} of volume: {}", key, bucket, volume); @@ -317,6 +358,13 @@ public void run() { keyWriteTime.getAndAdd(System.nanoTime() - keyWriteStart); totalBytesWritten.getAndAdd(value.length); numberOfKeysAdded.getAndIncrement(); + if(validateWrites) { + boolean validate = validationQueue.offer( + new KeyValue(volume, bucket, key, value)); + if(validate) { + LOG.trace("Key {}, is queued for validation.", key); + } + } } catch (Exception e) { exception = true; LOG.error("Exception while adding key: {} in bucket: {}" + @@ -341,11 +389,19 @@ private void addShutdownHook() { } private Thread getProgressBarThread() { - long maxValue = Integer.parseInt(numOfVolumes) * - Integer.parseInt(numOfBuckets) * - Integer.parseInt(numOfKeys); + Supplier currentValue; + long maxValue; + + if(mode.equals("online")) { + throw new UnsupportedOperationException("Not yet implemented."); + } else { + currentValue = () -> numberOfKeysAdded.get(); + maxValue = Long.parseLong(numOfVolumes) * + Long.parseLong(numOfBuckets) * + Long.parseLong(numOfKeys); + } Thread progressBarThread = new Thread( - new ProgressBar(System.out, maxValue)); + new ProgressBar(System.out, currentValue, maxValue)); progressBarThread.setName("ProgressBar"); return progressBarThread; } @@ -355,10 +411,13 @@ private class ProgressBar implements Runnable { private static final long REFRESH_INTERVAL = 1000L; private PrintStream stream; + private Supplier currentValue; private long maxValue; - ProgressBar(PrintStream stream, long maxValue) { + ProgressBar(PrintStream stream, Supplier currentValue, + long maxValue) { this.stream = stream; + this.currentValue = currentValue; this.maxValue = maxValue; } @@ -366,9 +425,9 @@ private class ProgressBar implements Runnable { public void run() { try { stream.println(); - long keys; - while((keys = numberOfKeysAdded.get()) < maxValue) { - print(keys); + long value; + while((value = currentValue.get()) < maxValue) { + print(value); if(completed) { break; } @@ -389,11 +448,11 @@ public void run() { /** * Given current value prints the progress bar. * - * @param currentValue + * @param value */ - private void print(long currentValue) { + private void print(long value) { stream.print('\r'); - double percent = 100.0 * currentValue / maxValue; + double percent = 100.0 * value / maxValue; StringBuilder sb = new StringBuilder(); sb.append(" " + String.format("%.2f", percent) + "% |"); @@ -404,7 +463,7 @@ private void print(long currentValue) { sb.append(' '); } sb.append("| "); - sb.append(currentValue + "/" + maxValue); + sb.append(value + "/" + maxValue); long timeInSec = TimeUnit.SECONDS.convert( System.nanoTime() - startTime, TimeUnit.NANOSECONDS); String timeToPrint = String.format("%d:%02d:%02d", timeInSec / 3600, @@ -494,10 +553,158 @@ private void printStats(PrintStream out) { out.println("Time spent in key creation: " + prettyKeyCreationTime); out.println("Time spent in writing keys: " + prettyKeyWriteTime); out.println("Total bytes written: " + totalBytesWritten); + if(validateWrites) { + out.println("Total number of writes validated: " + + totalWritesValidated); + out.println("Writes validated: " + + (100.0 * totalWritesValidated / numberOfKeysAdded.get()) + + " %"); + out.println("Successful validation: " + + writeValidationSuccessCount); + out.println("Unsuccessful validation: " + + writeValidationFailureCount); + } out.println("Total Execution time: " + execTime); out.println("***************************************************"); } + /** + * Returns the number of volumes created. + * @return volume count. + */ + @VisibleForTesting + int getNumberOfVolumesCreated() { + return numberOfVolumesCreated.get(); + } + + /** + * Returns the number of buckets created. + * @return bucket count. + */ + @VisibleForTesting + int getNumberOfBucketsCreated() { + return numberOfBucketsCreated.get(); + } + + /** + * Returns the number of keys added. + * @return keys count. + */ + @VisibleForTesting + long getNumberOfKeysAdded() { + return numberOfKeysAdded.get(); + } + + /** + * Returns true if random validation of write is enabled. + * @return validateWrites + */ + @VisibleForTesting + boolean getValidateWrites() { + return validateWrites; + } + + /** + * Returns the number of keys validated. + * @return validated key count. + */ + @VisibleForTesting + long getTotalKeysValidated() { + return totalWritesValidated; + } + + /** + * Returns the number of successful validation. + * @return successful validation count. + */ + @VisibleForTesting + long getSuccessfulValidationCount() { + return writeValidationSuccessCount; + } + + /** + * Returns the number of unsuccessful validation. + * @return unsuccessful validation count. + */ + @VisibleForTesting + long getUnsuccessfulValidationCount() { + return writeValidationFailureCount; + } + + /** + * Validates the write done in ozone cluster. + */ + private class Validator implements Runnable { + + @Override + public void run() { + while(!completed) { + try { + KeyValue kv = validationQueue.poll(5, TimeUnit.SECONDS); + if(kv != null) { + OzoneInputStream is = ozoneClient. + getKey(kv.volume, kv.bucket, kv.key); + byte[] value = new byte[kv.value.length]; + int length = is.read(value); + totalWritesValidated++; + if (length == kv.value.length && Arrays.equals(value, kv.value)) { + writeValidationSuccessCount++; + } else { + writeValidationFailureCount++; + LOG.warn("Data validation error for key {}/{}/{}", + kv.volume, kv.bucket, kv.key); + LOG.warn("Expected: {}, Actual: {}", + DFSUtil.bytes2String(kv.value), + DFSUtil.bytes2String(value)); + } + } + } catch (IOException | InterruptedException ex) { + LOG.error("Exception while validating write: " + ex.getMessage()); + } + } + } + } + + + + /** + * Wrapper to hold ozone key-value pair. + */ + private static class KeyValue { + + /** + * Volume name associated with the key-value. + */ + private String volume; + + /** + * Bucket name associated with the key-value. + */ + private String bucket; + /** + * Key name associated with the key-value. + */ + private String key; + /** + * Value associated with the key-value. + */ + private byte[] value; + + /** + * Constructs a new ozone key-value pair. + * + * @param key key part + * @param value value part + */ + KeyValue( + String volume, String bucket, String key, byte[] value) { + this.volume = volume; + this.bucket = bucket; + this.key = key; + this.value = value; + } + } + /** * @param args arguments */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/tools/TestCorona.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/tools/TestCorona.java new file mode 100644 index 0000000000..7e87b86322 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/tools/TestCorona.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.tools; + +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.OzoneConfiguration; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.util.ToolRunner; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.util.ArrayList; +import java.util.List; + +/** + * Tests Corona, with MiniOzoneCluster. + */ +public class TestCorona { + + private static MiniOzoneCluster cluster; + private static OzoneConfiguration conf; + + /** + * Create a MiniDFSCluster for testing. + *

+ * Ozone is made active by setting OZONE_ENABLED = true and + * OZONE_HANDLER_TYPE_KEY = "distributed" + * + * @throws IOException + */ + @BeforeClass + public static void init() throws Exception { + conf = new OzoneConfiguration(); + conf.set(OzoneConfigKeys.OZONE_HANDLER_TYPE_KEY, + OzoneConsts.OZONE_HANDLER_DISTRIBUTED); + cluster = new MiniOzoneCluster.Builder(conf) + .setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build(); + } + + /** + * Shutdown MiniDFSCluster. + */ + @AfterClass + public static void shutdown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test + public void defaultTest() throws Exception { + List args = new ArrayList<>(); + args.add("-numOfVolumes"); + args.add("2"); + args.add("-numOfBuckets"); + args.add("5"); + args.add("-numOfKeys"); + args.add("10"); + Corona corona = new Corona(conf); + int res = ToolRunner.run(conf, corona, + args.toArray(new String[0])); + Assert.assertEquals(2, corona.getNumberOfVolumesCreated()); + Assert.assertEquals(10, corona.getNumberOfBucketsCreated()); + Assert.assertEquals(100, corona.getNumberOfKeysAdded()); + Assert.assertEquals(0, res); + } + + @Test + public void validateWriteTest() throws Exception { + PrintStream originalStream = System.out; + ByteArrayOutputStream outStream = new ByteArrayOutputStream(); + System.setOut(new PrintStream(outStream)); + List args = new ArrayList<>(); + args.add("-validateWrites"); + args.add("-numOfVolumes"); + args.add("2"); + args.add("-numOfBuckets"); + args.add("5"); + args.add("-numOfKeys"); + args.add("10"); + Corona corona = new Corona(conf); + int res = ToolRunner.run(conf, corona, + args.toArray(new String[0])); + Assert.assertEquals(0, res); + Assert.assertEquals(2, corona.getNumberOfVolumesCreated()); + Assert.assertEquals(10, corona.getNumberOfBucketsCreated()); + Assert.assertEquals(100, corona.getNumberOfKeysAdded()); + Assert.assertTrue(corona.getValidateWrites()); + Assert.assertNotEquals(0, corona.getTotalKeysValidated()); + Assert.assertNotEquals(0, corona.getSuccessfulValidationCount()); + Assert.assertEquals(0, corona.getUnsuccessfulValidationCount()); + System.setOut(originalStream); + } + +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/tools/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/tools/package-info.java new file mode 100644 index 0000000000..ea56345352 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/tools/package-info.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.tools; +/** + * Classes related to Ozone tools tests. + */