diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientImpl.java index 11e8fffe02..01807b0896 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientImpl.java @@ -55,7 +55,11 @@ import org.slf4j.LoggerFactory; import java.io.Closeable; import java.io.IOException; import java.net.InetSocketAddress; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.UUID; import java.util.stream.Collectors; /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/Corona.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/Corona.java index dc63a5f4d6..581ccc606a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/Corona.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/Corona.java @@ -77,6 +77,7 @@ public final class Corona extends Configured implements Tool { private static final String HELP = "help"; private static final String MODE = "mode"; private static final String SOURCE = "source"; + private static final String NUM_OF_THREADS = "numOfThreads"; private static final String NUM_OF_VOLUMES = "numOfVolumes"; private static final String NUM_OF_BUCKETS = "numOfBuckets"; private static final String NUM_OF_KEYS = "numOfKeys"; @@ -85,19 +86,21 @@ public final class Corona extends Configured implements Tool { private static final String SOURCE_DEFAULT = "https://commoncrawl.s3.amazonaws.com/" + "crawl-data/CC-MAIN-2017-17/warc.paths.gz"; + private static final String NUM_OF_THREADS_DEFAULT = "10"; private static final String NUM_OF_VOLUMES_DEFAULT = "10"; private static final String NUM_OF_BUCKETS_DEFAULT = "1000"; private static final String NUM_OF_KEYS_DEFAULT = "500000"; - private static final int NUM_OF_THREADS_DEFAULT = 10; - private static final Logger LOG = LoggerFactory.getLogger(Corona.class); private boolean printUsage = false; + private boolean completed = false; + private boolean exception = false; private String mode; private String source; + private String numOfThreads; private String numOfVolumes; private String numOfBuckets; private String numOfKeys; @@ -107,18 +110,29 @@ public final class Corona extends Configured implements Tool { private long startTime; + private AtomicLong volumeCreationTime; + private AtomicLong bucketCreationTime; + private AtomicLong keyCreationTime; + private AtomicLong keyWriteTime; + + private AtomicLong totalBytesWritten; + private AtomicInteger numberOfVolumesCreated; private AtomicInteger numberOfBucketsCreated; private AtomicLong numberOfKeysAdded; private Corona(Configuration conf) throws IOException { startTime = System.nanoTime(); + volumeCreationTime = new AtomicLong(); + bucketCreationTime = new AtomicLong(); + keyCreationTime = new AtomicLong(); + keyWriteTime = new AtomicLong(); + totalBytesWritten = new AtomicLong(); numberOfVolumesCreated = new AtomicInteger(); numberOfBucketsCreated = new AtomicInteger(); numberOfKeysAdded = new AtomicLong(); OzoneClientFactory.setConfiguration(conf); ozoneClient = OzoneClientFactory.getRpcClient(); - processor = Executors.newFixedThreadPool(NUM_OF_THREADS_DEFAULT); } @Override @@ -130,13 +144,15 @@ public final class Corona extends Configured implements Tool { usage(); System.exit(0); } + LOG.info("Number of Threads: " + numOfThreads); + processor = Executors.newFixedThreadPool(Integer.parseInt(numOfThreads)); addShutdownHook(); if(mode.equals("online")) { LOG.info("Mode: online"); throw new UnsupportedOperationException("Not yet implemented."); } else { LOG.info("Mode: offline"); - LOG.info("Number of Volumes: {}.", numOfBuckets); + LOG.info("Number of Volumes: {}.", numOfVolumes); LOG.info("Number of Buckets per Volume: {}.", numOfBuckets); LOG.info("Number of Keys per Bucket: {}.", numOfKeys); for(int i = 0; i < Integer.parseInt(numOfVolumes); i++) { @@ -144,8 +160,13 @@ public final class Corona extends Configured implements Tool { RandomStringUtils.randomNumeric(5); processor.submit(new OfflineProcessor(volume)); } + Thread progressbar = getProgressBarThread(); + LOG.info("Starting progress bar Thread."); + progressbar.start(); processor.shutdown(); processor.awaitTermination(Integer.MAX_VALUE, TimeUnit.MILLISECONDS); + completed = true; + progressbar.join(); return 0; } } @@ -168,6 +189,12 @@ public final class Corona extends Configured implements Tool { "commoncrawl warc file to be used when the mode is online."); Option optSource = OptionBuilder.create(SOURCE); + OptionBuilder.withArgName("value"); + OptionBuilder.hasArg(); + OptionBuilder.withDescription("number of threads to be launched " + + "for the run"); + Option optNumOfThreads = OptionBuilder.create(NUM_OF_THREADS); + OptionBuilder.withArgName("value"); OptionBuilder.hasArg(); OptionBuilder.withDescription("specifies number of Volumes to be " + @@ -189,6 +216,7 @@ public final class Corona extends Configured implements Tool { options.addOption(optHelp); options.addOption(optMode); options.addOption(optSource); + options.addOption(optNumOfThreads); options.addOption(optNumOfVolumes); options.addOption(optNumOfBuckets); options.addOption(optNumOfKeys); @@ -204,6 +232,9 @@ public final class Corona extends Configured implements Tool { source = cmdLine.hasOption(SOURCE) ? cmdLine.getOptionValue(SOURCE) : SOURCE_DEFAULT; + numOfThreads = cmdLine.hasOption(NUM_OF_THREADS) ? + cmdLine.getOptionValue(NUM_OF_THREADS) : NUM_OF_THREADS_DEFAULT; + numOfVolumes = cmdLine.hasOption(NUM_OF_VOLUMES) ? cmdLine.getOptionValue(NUM_OF_VOLUMES) : NUM_OF_VOLUMES_DEFAULT; @@ -216,6 +247,8 @@ public final class Corona extends Configured implements Tool { private void usage() { System.out.println("Options supported are:"); + System.out.println("-numOfThreads " + + "number of threads to be launched for the run."); System.out.println("-mode [online | offline] " + "specifies the mode in which Corona should run."); System.out.println("-source " @@ -245,7 +278,9 @@ public final class Corona extends Configured implements Tool { this.totalKeys = Integer.parseInt(numOfKeys); this.volume = volume; LOG.trace("Creating volume: {}", volume); + long start = System.nanoTime(); ozoneClient.createVolume(this.volume); + volumeCreationTime.getAndAdd(System.nanoTime() - start); numberOfVolumesCreated.getAndIncrement(); } @@ -256,7 +291,9 @@ public final class Corona extends Configured implements Tool { RandomStringUtils.randomNumeric(5); try { LOG.trace("Creating bucket: {} in volume: {}", bucket, volume); + long start = System.nanoTime(); ozoneClient.createBucket(volume, bucket); + bucketCreationTime.getAndAdd(System.nanoTime() - start); numberOfBucketsCreated.getAndIncrement(); for (int k = 0; k < totalKeys; k++) { String key = "key-" + k + "-" + @@ -265,17 +302,24 @@ public final class Corona extends Configured implements Tool { try { LOG.trace("Adding key: {} in bucket: {} of volume: {}", key, bucket, volume); + long keyCreateStart = System.nanoTime(); OzoneOutputStream os = ozoneClient.createKey( volume, bucket, key, value.length); + keyCreationTime.getAndAdd(System.nanoTime() - keyCreateStart); + long keyWriteStart = System.nanoTime(); os.write(value); os.close(); + keyWriteTime.getAndAdd(System.nanoTime() - keyWriteStart); + totalBytesWritten.getAndAdd(value.length); numberOfKeysAdded.getAndIncrement(); } catch (Exception e) { + exception = true; LOG.error("Exception while adding key: {} in bucket: {}" + " of volume: {}.", key, bucket, volume, e); } } } catch (Exception e) { + exception = true; LOG.error("Exception while creating bucket: {}" + " in volume: {}.", bucket, volume, e); } @@ -287,11 +331,82 @@ public final class Corona extends Configured implements Tool { * Adds ShutdownHook to print statistics. */ private void addShutdownHook() { - Runtime.getRuntime().addShutdownHook(new Thread() { - public void run() { - printStats(System.out); + Runtime.getRuntime().addShutdownHook( + new Thread(() -> printStats(System.out))); + } + + private Thread getProgressBarThread() { + long maxValue = Integer.parseInt(numOfVolumes) * + Integer.parseInt(numOfBuckets) * + Integer.parseInt(numOfKeys); + Thread progressBarThread = new Thread( + new ProgressBar(System.out, maxValue)); + progressBarThread.setName("ProgressBar"); + return progressBarThread; + } + + private class ProgressBar implements Runnable { + + private final long refreshInterval = 1000L; + + private PrintStream stream; + private long maxValue; + + ProgressBar(PrintStream stream, long maxValue) { + this.stream = stream; + this.maxValue = maxValue; + } + + @Override + public void run() { + try { + stream.println(); + long keys; + while((keys = numberOfKeysAdded.get()) < maxValue) { + print(keys); + if(completed) { + break; + } + Thread.sleep(refreshInterval); + } + if(exception) { + stream.println(); + stream.println("Incomplete termination, " + + "check log for exception."); + } else { + print(maxValue); + } + stream.println(); + } catch (InterruptedException e) { } - }); + } + + /** + * Given current value prints the progress bar. + * + * @param currentValue + */ + private void print(long currentValue) { + stream.print('\r'); + double percent = 100.0 * currentValue / maxValue; + StringBuilder sb = new StringBuilder(); + sb.append(" " + String.format("%.2f", percent) + "% |"); + + for (int i = 0; i <= percent; i++) { + sb.append('█'); + } + for (int j = 0; j < 100 - percent; j++) { + sb.append(' '); + } + sb.append("| "); + sb.append(currentValue + "/" + maxValue); + long timeInSec = TimeUnit.SECONDS.convert( + System.nanoTime() - startTime, TimeUnit.NANOSECONDS); + String timeToPrint = String.format("%d:%02d:%02d", timeInSec / 3600, + (timeInSec % 3600) / 60, timeInSec % 60); + sb.append(" Time: " + timeToPrint); + stream.print(sb); + } } /** @@ -300,15 +415,81 @@ public final class Corona extends Configured implements Tool { * @param out PrintStream */ private void printStats(PrintStream out) { - long timeInSec = TimeUnit.SECONDS.convert( - System.nanoTime() - startTime, TimeUnit.NANOSECONDS); - String timeToPrint = timeInSec < 60 ? timeInSec + " seconds" : - TimeUnit.MINUTES.convert(timeInSec, TimeUnit.SECONDS) + " minuites"; + int threadCount = Integer.parseInt(numOfThreads); + + long endTime = System.nanoTime() - startTime; + String execTime = String.format("%02d:%02d:%02d", + TimeUnit.NANOSECONDS.toHours(endTime), + TimeUnit.NANOSECONDS.toMinutes(endTime) - + TimeUnit.HOURS.toMinutes( + TimeUnit.NANOSECONDS.toHours(endTime)), + TimeUnit.NANOSECONDS.toSeconds(endTime) - + TimeUnit.MINUTES.toSeconds( + TimeUnit.NANOSECONDS.toMinutes(endTime))); + + long volumeTime = volumeCreationTime.longValue(); + String prettyVolumeTime = String.format("%02d:%02d:%02d:%02d", + TimeUnit.NANOSECONDS.toHours(volumeTime), + TimeUnit.NANOSECONDS.toMinutes(volumeTime) - + TimeUnit.HOURS.toMinutes( + TimeUnit.NANOSECONDS.toHours(volumeTime)), + TimeUnit.NANOSECONDS.toSeconds(volumeTime) - + TimeUnit.MINUTES.toSeconds( + TimeUnit.NANOSECONDS.toMinutes(volumeTime)), + TimeUnit.NANOSECONDS.toMillis(volumeTime) - + TimeUnit.SECONDS.toMillis( + TimeUnit.NANOSECONDS.toSeconds(volumeTime))); + + long bucketTime = bucketCreationTime.longValue() / threadCount; + String prettyBucketTime = String.format("%02d:%02d:%02d:%02d", + TimeUnit.NANOSECONDS.toHours(bucketTime), + TimeUnit.NANOSECONDS.toMinutes(bucketTime) - + TimeUnit.HOURS.toMinutes( + TimeUnit.NANOSECONDS.toHours(bucketTime)), + TimeUnit.NANOSECONDS.toSeconds(bucketTime) - + TimeUnit.MINUTES.toSeconds( + TimeUnit.NANOSECONDS.toMinutes(bucketTime)), + TimeUnit.NANOSECONDS.toMillis(bucketTime) - + TimeUnit.SECONDS.toMillis( + TimeUnit.NANOSECONDS.toSeconds(bucketTime))); + + long totalKeyCreationTime = keyCreationTime.longValue() / threadCount; + String prettyKeyCreationTime = String.format("%02d:%02d:%02d:%02d", + TimeUnit.NANOSECONDS.toHours(totalKeyCreationTime), + TimeUnit.NANOSECONDS.toMinutes(totalKeyCreationTime) - + TimeUnit.HOURS.toMinutes( + TimeUnit.NANOSECONDS.toHours(totalKeyCreationTime)), + TimeUnit.NANOSECONDS.toSeconds(totalKeyCreationTime) - + TimeUnit.MINUTES.toSeconds( + TimeUnit.NANOSECONDS.toMinutes(totalKeyCreationTime)), + TimeUnit.NANOSECONDS.toMillis(totalKeyCreationTime) - + TimeUnit.SECONDS.toMillis( + TimeUnit.NANOSECONDS.toSeconds(totalKeyCreationTime))); + + long totalKeyWriteTime = keyWriteTime.longValue() / threadCount; + String prettyKeyWriteTime = String.format("%02d:%02d:%02d:%02d", + TimeUnit.NANOSECONDS.toHours(totalKeyWriteTime), + TimeUnit.NANOSECONDS.toMinutes(totalKeyWriteTime) - + TimeUnit.HOURS.toMinutes( + TimeUnit.NANOSECONDS.toHours(totalKeyWriteTime)), + TimeUnit.NANOSECONDS.toSeconds(totalKeyWriteTime) - + TimeUnit.MINUTES.toSeconds( + TimeUnit.NANOSECONDS.toMinutes(totalKeyWriteTime)), + TimeUnit.NANOSECONDS.toMillis(totalKeyWriteTime) - + TimeUnit.SECONDS.toMillis( + TimeUnit.NANOSECONDS.toSeconds(totalKeyWriteTime))); + + out.println(); out.println("***************************************************"); out.println("Number of Volumes created: " + numberOfVolumesCreated); out.println("Number of Buckets created: " + numberOfBucketsCreated); out.println("Number of Keys added: " + numberOfKeysAdded); - out.println("Execution time: " + timeToPrint); + out.println("Time spent in volume creation: " + prettyVolumeTime); + out.println("Time spent in bucket creation: " + prettyBucketTime); + out.println("Time spent in key creation: " + prettyKeyCreationTime); + out.println("Time spent in writing keys: " + prettyKeyWriteTime); + out.println("Total bytes written: " + totalBytesWritten); + out.println("Total Execution time: " + execTime); out.println("***************************************************"); }