From 6225622dee9d62906deb1ec2ef7b02931ab372f4 Mon Sep 17 00:00:00 2001 From: Anu Engineer Date: Thu, 20 Jul 2017 17:59:53 -0700 Subject: [PATCH] HDFS-12071. Ozone: Corona: Implementation of Corona. Contributed by Nandakumar. --- .../hadoop/ozone/OzoneClientFactory.java | 69 ++++ .../java/org/apache/hadoop/ozone/Corona.java | 323 ++++++++++++++++++ .../apache/hadoop/test/OzoneTestDriver.java | 59 ++++ 3 files changed, 451 insertions(+) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientFactory.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/Corona.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/test/OzoneTestDriver.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientFactory.java new file mode 100644 index 0000000000..7866f58ce0 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientFactory.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone; + +import org.apache.hadoop.conf.Configuration; + +import java.io.IOException; + +/** + * Factory class to create different types of OzoneClients. + */ +public final class OzoneClientFactory { + + /** + * Private constructor, class is not meant to be initialized. + */ + private OzoneClientFactory(){} + + private static Configuration configuration; + + /** + * Returns an OzoneClient which will use RPC protocol to perform + * client operations. + * + * @return OzoneClient + * @throws IOException + */ + public static OzoneClient getRpcClient() throws IOException { + return new OzoneClientImpl(getConfiguration()); + } + + /** + * Sets the configuration, which will be used while creating OzoneClient. + * + * @param conf + */ + public static void setConfiguration(Configuration conf) { + configuration = conf; + } + + /** + * Returns the configuration if it's already set, else creates a new + * {@link OzoneConfiguration} and returns it. + * + * @return Configuration + */ + private static synchronized Configuration getConfiguration() { + if(configuration == null) { + setConfiguration(new OzoneConfiguration()); + } + return configuration; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/Corona.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/Corona.java new file mode 100644 index 0000000000..dc63a5f4d6 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/Corona.java @@ -0,0 +1,323 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone; + + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.Options; +import org.apache.commons.lang.RandomStringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.ozone.io.OzoneOutputStream; +import org.apache.hadoop.util.GenericOptionsParser; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.PrintStream; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Corona - A tool to populate ozone with data for testing.
+ * This is not a map-reduce program and this is not for benchmarking + * Ozone write throughput.
+ * It supports both online and offline modes. Default mode is offline, + * -mode can be used to change the mode. + *

+ * In online mode, active internet connection is required, + * common crawl data from AWS will be used.
+ * Default source is:
+ * https://commoncrawl.s3.amazonaws.com/crawl-data/ + * CC-MAIN-2017-17/warc.paths.gz
+ * (it contains the path to actual data segment)
+ * User can override this using -source. + * The following values are derived from URL of Common Crawl data + *

+ * In offline mode, the data will be random bytes and + * size of data will be 10 KB.
+ * + */ +public final class Corona extends Configured implements Tool { + + private static final String HELP = "help"; + private static final String MODE = "mode"; + private static final String SOURCE = "source"; + private static final String NUM_OF_VOLUMES = "numOfVolumes"; + private static final String NUM_OF_BUCKETS = "numOfBuckets"; + private static final String NUM_OF_KEYS = "numOfKeys"; + + private static final String MODE_DEFAULT = "offline"; + private static final String SOURCE_DEFAULT = + "https://commoncrawl.s3.amazonaws.com/" + + "crawl-data/CC-MAIN-2017-17/warc.paths.gz"; + private static final String NUM_OF_VOLUMES_DEFAULT = "10"; + private static final String NUM_OF_BUCKETS_DEFAULT = "1000"; + private static final String NUM_OF_KEYS_DEFAULT = "500000"; + + private static final int NUM_OF_THREADS_DEFAULT = 10; + + private static final Logger LOG = + LoggerFactory.getLogger(Corona.class); + + private boolean printUsage = false; + + private String mode; + private String source; + private String numOfVolumes; + private String numOfBuckets; + private String numOfKeys; + + private OzoneClient ozoneClient; + private ExecutorService processor; + + private long startTime; + + private AtomicInteger numberOfVolumesCreated; + private AtomicInteger numberOfBucketsCreated; + private AtomicLong numberOfKeysAdded; + + private Corona(Configuration conf) throws IOException { + startTime = System.nanoTime(); + numberOfVolumesCreated = new AtomicInteger(); + numberOfBucketsCreated = new AtomicInteger(); + numberOfKeysAdded = new AtomicLong(); + OzoneClientFactory.setConfiguration(conf); + ozoneClient = OzoneClientFactory.getRpcClient(); + processor = Executors.newFixedThreadPool(NUM_OF_THREADS_DEFAULT); + } + + @Override + public int run(String[] args) throws Exception { + GenericOptionsParser parser = new GenericOptionsParser(getConf(), + getOzonePetaGenOptions(), args); + parseOzonePetaGenOptions(parser.getCommandLine()); + if(printUsage) { + usage(); + System.exit(0); + } + addShutdownHook(); + if(mode.equals("online")) { + LOG.info("Mode: online"); + throw new UnsupportedOperationException("Not yet implemented."); + } else { + LOG.info("Mode: offline"); + LOG.info("Number of Volumes: {}.", numOfBuckets); + LOG.info("Number of Buckets per Volume: {}.", numOfBuckets); + LOG.info("Number of Keys per Bucket: {}.", numOfKeys); + for(int i = 0; i < Integer.parseInt(numOfVolumes); i++) { + String volume = "vol-" + i + "-" + + RandomStringUtils.randomNumeric(5); + processor.submit(new OfflineProcessor(volume)); + } + processor.shutdown(); + processor.awaitTermination(Integer.MAX_VALUE, TimeUnit.MILLISECONDS); + return 0; + } + } + + private Options getOzonePetaGenOptions() { + Options options = new Options(); + + OptionBuilder.withDescription("prints usage."); + Option optHelp = OptionBuilder.create(HELP); + + OptionBuilder.withArgName("online | offline"); + OptionBuilder.hasArg(); + OptionBuilder.withDescription("specifies the mode of " + + "Corona run."); + Option optMode = OptionBuilder.create(MODE); + + OptionBuilder.withArgName("source url"); + OptionBuilder.hasArg(); + OptionBuilder.withDescription("specifies the URL of s3 " + + "commoncrawl warc file to be used when the mode is online."); + Option optSource = OptionBuilder.create(SOURCE); + + OptionBuilder.withArgName("value"); + OptionBuilder.hasArg(); + OptionBuilder.withDescription("specifies number of Volumes to be " + + "created in offline mode"); + Option optNumOfVolumes = OptionBuilder.create(NUM_OF_VOLUMES); + + OptionBuilder.withArgName("value"); + OptionBuilder.hasArg(); + OptionBuilder.withDescription("specifies number of Buckets to be " + + "created per Volume in offline mode"); + Option optNumOfBuckets = OptionBuilder.create(NUM_OF_BUCKETS); + + OptionBuilder.withArgName("value"); + OptionBuilder.hasArg(); + OptionBuilder.withDescription("specifies number of Keys to be " + + "created per Bucket in offline mode"); + Option optNumOfKeys = OptionBuilder.create(NUM_OF_KEYS); + + options.addOption(optHelp); + options.addOption(optMode); + options.addOption(optSource); + options.addOption(optNumOfVolumes); + options.addOption(optNumOfBuckets); + options.addOption(optNumOfKeys); + return options; + } + + private void parseOzonePetaGenOptions(CommandLine cmdLine) { + printUsage = cmdLine.hasOption(HELP); + + mode = cmdLine.hasOption(MODE) ? + cmdLine.getOptionValue(MODE) : MODE_DEFAULT; + + source = cmdLine.hasOption(SOURCE) ? + cmdLine.getOptionValue(SOURCE) : SOURCE_DEFAULT; + + numOfVolumes = cmdLine.hasOption(NUM_OF_VOLUMES) ? + cmdLine.getOptionValue(NUM_OF_VOLUMES) : NUM_OF_VOLUMES_DEFAULT; + + numOfBuckets = cmdLine.hasOption(NUM_OF_BUCKETS) ? + cmdLine.getOptionValue(NUM_OF_BUCKETS) : NUM_OF_BUCKETS_DEFAULT; + + numOfKeys = cmdLine.hasOption(NUM_OF_KEYS) ? + cmdLine.getOptionValue(NUM_OF_KEYS) : NUM_OF_KEYS_DEFAULT; + } + + private void usage() { + System.out.println("Options supported are:"); + System.out.println("-mode [online | offline] " + + "specifies the mode in which Corona should run."); + System.out.println("-source " + + "specifies the URL of s3 commoncrawl warc file to " + + "be used when the mode is online."); + System.out.println("-numOfVolumes " + + "specifies number of Volumes to be created in offline mode"); + System.out.println("-numOfBuckets " + + "specifies number of Buckets to be created per Volume " + + "in offline mode"); + System.out.println("-numOfKeys " + + "specifies number of Keys to be created per Bucket " + + "in offline mode"); + System.out.println("-help " + + "prints usage."); + System.out.println(); + } + + private class OfflineProcessor implements Runnable { + + private int totalBuckets; + private int totalKeys; + private String volume; + + OfflineProcessor(String volume) throws Exception { + this.totalBuckets = Integer.parseInt(numOfBuckets); + this.totalKeys = Integer.parseInt(numOfKeys); + this.volume = volume; + LOG.trace("Creating volume: {}", volume); + ozoneClient.createVolume(this.volume); + numberOfVolumesCreated.getAndIncrement(); + } + + @Override + public void run() { + for (int j = 0; j < totalBuckets; j++) { + String bucket = "bucket-" + j + "-" + + RandomStringUtils.randomNumeric(5); + try { + LOG.trace("Creating bucket: {} in volume: {}", bucket, volume); + ozoneClient.createBucket(volume, bucket); + numberOfBucketsCreated.getAndIncrement(); + for (int k = 0; k < totalKeys; k++) { + String key = "key-" + k + "-" + + RandomStringUtils.randomNumeric(5); + byte[] value = RandomStringUtils.randomAscii(10240).getBytes(); + try { + LOG.trace("Adding key: {} in bucket: {} of volume: {}", + key, bucket, volume); + OzoneOutputStream os = ozoneClient.createKey( + volume, bucket, key, value.length); + os.write(value); + os.close(); + numberOfKeysAdded.getAndIncrement(); + } catch (Exception e) { + LOG.error("Exception while adding key: {} in bucket: {}" + + " of volume: {}.", key, bucket, volume, e); + } + } + } catch (Exception e) { + LOG.error("Exception while creating bucket: {}" + + " in volume: {}.", bucket, volume, e); + } + } + } + } + + /** + * Adds ShutdownHook to print statistics. + */ + private void addShutdownHook() { + Runtime.getRuntime().addShutdownHook(new Thread() { + public void run() { + printStats(System.out); + } + }); + } + + /** + * Prints stats of {@link Corona} run to the PrintStream. + * + * @param out PrintStream + */ + private void printStats(PrintStream out) { + long timeInSec = TimeUnit.SECONDS.convert( + System.nanoTime() - startTime, TimeUnit.NANOSECONDS); + String timeToPrint = timeInSec < 60 ? timeInSec + " seconds" : + TimeUnit.MINUTES.convert(timeInSec, TimeUnit.SECONDS) + " minuites"; + out.println("***************************************************"); + out.println("Number of Volumes created: " + numberOfVolumesCreated); + out.println("Number of Buckets created: " + numberOfBucketsCreated); + out.println("Number of Keys added: " + numberOfKeysAdded); + out.println("Execution time: " + timeToPrint); + out.println("***************************************************"); + } + + /** + * @param args arguments + */ + public static void main(String[] args) throws Exception { + Configuration conf = new OzoneConfiguration(); + int res = ToolRunner.run(conf, new Corona(conf), args); + System.exit(res); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/test/OzoneTestDriver.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/test/OzoneTestDriver.java new file mode 100644 index 0000000000..ecb929d01a --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/test/OzoneTestDriver.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.test; + +import org.apache.hadoop.ozone.Corona; +import org.apache.hadoop.util.ProgramDriver; + +/** + * Driver for Ozone tests. + */ +public class OzoneTestDriver { + + private final ProgramDriver pgd; + + public OzoneTestDriver() { + this(new ProgramDriver()); + } + + public OzoneTestDriver(ProgramDriver pgd) { + this.pgd = pgd; + try { + pgd.addClass("corona", Corona.class, + "Populates ozone with data."); + } catch(Throwable e) { + e.printStackTrace(); + } + } + + public void run(String[] args) { + int exitCode = -1; + try { + exitCode = pgd.run(args); + } catch(Throwable e) { + e.printStackTrace(); + } + + System.exit(exitCode); + } + + public static void main(String[] args){ + new OzoneTestDriver().run(args); + } +}