HDFS-12180. Ozone: Corona: Add stats and progress bar to corona. Contributed by Nandakumar.
This commit is contained in:
parent
a4b3160efb
commit
fcd4537e63
@ -55,7 +55,11 @@ import org.slf4j.LoggerFactory;
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.*;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
|
@ -77,6 +77,7 @@ public final class Corona extends Configured implements Tool {
|
||||
private static final String HELP = "help";
|
||||
private static final String MODE = "mode";
|
||||
private static final String SOURCE = "source";
|
||||
private static final String NUM_OF_THREADS = "numOfThreads";
|
||||
private static final String NUM_OF_VOLUMES = "numOfVolumes";
|
||||
private static final String NUM_OF_BUCKETS = "numOfBuckets";
|
||||
private static final String NUM_OF_KEYS = "numOfKeys";
|
||||
@ -85,19 +86,21 @@ public final class Corona extends Configured implements Tool {
|
||||
private static final String SOURCE_DEFAULT =
|
||||
"https://commoncrawl.s3.amazonaws.com/" +
|
||||
"crawl-data/CC-MAIN-2017-17/warc.paths.gz";
|
||||
private static final String NUM_OF_THREADS_DEFAULT = "10";
|
||||
private static final String NUM_OF_VOLUMES_DEFAULT = "10";
|
||||
private static final String NUM_OF_BUCKETS_DEFAULT = "1000";
|
||||
private static final String NUM_OF_KEYS_DEFAULT = "500000";
|
||||
|
||||
private static final int NUM_OF_THREADS_DEFAULT = 10;
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(Corona.class);
|
||||
|
||||
private boolean printUsage = false;
|
||||
private boolean completed = false;
|
||||
private boolean exception = false;
|
||||
|
||||
private String mode;
|
||||
private String source;
|
||||
private String numOfThreads;
|
||||
private String numOfVolumes;
|
||||
private String numOfBuckets;
|
||||
private String numOfKeys;
|
||||
@ -107,18 +110,29 @@ public final class Corona extends Configured implements Tool {
|
||||
|
||||
private long startTime;
|
||||
|
||||
private AtomicLong volumeCreationTime;
|
||||
private AtomicLong bucketCreationTime;
|
||||
private AtomicLong keyCreationTime;
|
||||
private AtomicLong keyWriteTime;
|
||||
|
||||
private AtomicLong totalBytesWritten;
|
||||
|
||||
private AtomicInteger numberOfVolumesCreated;
|
||||
private AtomicInteger numberOfBucketsCreated;
|
||||
private AtomicLong numberOfKeysAdded;
|
||||
|
||||
private Corona(Configuration conf) throws IOException {
|
||||
startTime = System.nanoTime();
|
||||
volumeCreationTime = new AtomicLong();
|
||||
bucketCreationTime = new AtomicLong();
|
||||
keyCreationTime = new AtomicLong();
|
||||
keyWriteTime = new AtomicLong();
|
||||
totalBytesWritten = new AtomicLong();
|
||||
numberOfVolumesCreated = new AtomicInteger();
|
||||
numberOfBucketsCreated = new AtomicInteger();
|
||||
numberOfKeysAdded = new AtomicLong();
|
||||
OzoneClientFactory.setConfiguration(conf);
|
||||
ozoneClient = OzoneClientFactory.getRpcClient();
|
||||
processor = Executors.newFixedThreadPool(NUM_OF_THREADS_DEFAULT);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -130,13 +144,15 @@ public final class Corona extends Configured implements Tool {
|
||||
usage();
|
||||
System.exit(0);
|
||||
}
|
||||
LOG.info("Number of Threads: " + numOfThreads);
|
||||
processor = Executors.newFixedThreadPool(Integer.parseInt(numOfThreads));
|
||||
addShutdownHook();
|
||||
if(mode.equals("online")) {
|
||||
LOG.info("Mode: online");
|
||||
throw new UnsupportedOperationException("Not yet implemented.");
|
||||
} else {
|
||||
LOG.info("Mode: offline");
|
||||
LOG.info("Number of Volumes: {}.", numOfBuckets);
|
||||
LOG.info("Number of Volumes: {}.", numOfVolumes);
|
||||
LOG.info("Number of Buckets per Volume: {}.", numOfBuckets);
|
||||
LOG.info("Number of Keys per Bucket: {}.", numOfKeys);
|
||||
for(int i = 0; i < Integer.parseInt(numOfVolumes); i++) {
|
||||
@ -144,8 +160,13 @@ public final class Corona extends Configured implements Tool {
|
||||
RandomStringUtils.randomNumeric(5);
|
||||
processor.submit(new OfflineProcessor(volume));
|
||||
}
|
||||
Thread progressbar = getProgressBarThread();
|
||||
LOG.info("Starting progress bar Thread.");
|
||||
progressbar.start();
|
||||
processor.shutdown();
|
||||
processor.awaitTermination(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
|
||||
completed = true;
|
||||
progressbar.join();
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
@ -168,6 +189,12 @@ public final class Corona extends Configured implements Tool {
|
||||
"commoncrawl warc file to be used when the mode is online.");
|
||||
Option optSource = OptionBuilder.create(SOURCE);
|
||||
|
||||
OptionBuilder.withArgName("value");
|
||||
OptionBuilder.hasArg();
|
||||
OptionBuilder.withDescription("number of threads to be launched " +
|
||||
"for the run");
|
||||
Option optNumOfThreads = OptionBuilder.create(NUM_OF_THREADS);
|
||||
|
||||
OptionBuilder.withArgName("value");
|
||||
OptionBuilder.hasArg();
|
||||
OptionBuilder.withDescription("specifies number of Volumes to be " +
|
||||
@ -189,6 +216,7 @@ public final class Corona extends Configured implements Tool {
|
||||
options.addOption(optHelp);
|
||||
options.addOption(optMode);
|
||||
options.addOption(optSource);
|
||||
options.addOption(optNumOfThreads);
|
||||
options.addOption(optNumOfVolumes);
|
||||
options.addOption(optNumOfBuckets);
|
||||
options.addOption(optNumOfKeys);
|
||||
@ -204,6 +232,9 @@ public final class Corona extends Configured implements Tool {
|
||||
source = cmdLine.hasOption(SOURCE) ?
|
||||
cmdLine.getOptionValue(SOURCE) : SOURCE_DEFAULT;
|
||||
|
||||
numOfThreads = cmdLine.hasOption(NUM_OF_THREADS) ?
|
||||
cmdLine.getOptionValue(NUM_OF_THREADS) : NUM_OF_THREADS_DEFAULT;
|
||||
|
||||
numOfVolumes = cmdLine.hasOption(NUM_OF_VOLUMES) ?
|
||||
cmdLine.getOptionValue(NUM_OF_VOLUMES) : NUM_OF_VOLUMES_DEFAULT;
|
||||
|
||||
@ -216,6 +247,8 @@ public final class Corona extends Configured implements Tool {
|
||||
|
||||
private void usage() {
|
||||
System.out.println("Options supported are:");
|
||||
System.out.println("-numOfThreads <value> "
|
||||
+ "number of threads to be launched for the run.");
|
||||
System.out.println("-mode [online | offline] "
|
||||
+ "specifies the mode in which Corona should run.");
|
||||
System.out.println("-source <url> "
|
||||
@ -245,7 +278,9 @@ public final class Corona extends Configured implements Tool {
|
||||
this.totalKeys = Integer.parseInt(numOfKeys);
|
||||
this.volume = volume;
|
||||
LOG.trace("Creating volume: {}", volume);
|
||||
long start = System.nanoTime();
|
||||
ozoneClient.createVolume(this.volume);
|
||||
volumeCreationTime.getAndAdd(System.nanoTime() - start);
|
||||
numberOfVolumesCreated.getAndIncrement();
|
||||
}
|
||||
|
||||
@ -256,7 +291,9 @@ public final class Corona extends Configured implements Tool {
|
||||
RandomStringUtils.randomNumeric(5);
|
||||
try {
|
||||
LOG.trace("Creating bucket: {} in volume: {}", bucket, volume);
|
||||
long start = System.nanoTime();
|
||||
ozoneClient.createBucket(volume, bucket);
|
||||
bucketCreationTime.getAndAdd(System.nanoTime() - start);
|
||||
numberOfBucketsCreated.getAndIncrement();
|
||||
for (int k = 0; k < totalKeys; k++) {
|
||||
String key = "key-" + k + "-" +
|
||||
@ -265,17 +302,24 @@ public final class Corona extends Configured implements Tool {
|
||||
try {
|
||||
LOG.trace("Adding key: {} in bucket: {} of volume: {}",
|
||||
key, bucket, volume);
|
||||
long keyCreateStart = System.nanoTime();
|
||||
OzoneOutputStream os = ozoneClient.createKey(
|
||||
volume, bucket, key, value.length);
|
||||
keyCreationTime.getAndAdd(System.nanoTime() - keyCreateStart);
|
||||
long keyWriteStart = System.nanoTime();
|
||||
os.write(value);
|
||||
os.close();
|
||||
keyWriteTime.getAndAdd(System.nanoTime() - keyWriteStart);
|
||||
totalBytesWritten.getAndAdd(value.length);
|
||||
numberOfKeysAdded.getAndIncrement();
|
||||
} catch (Exception e) {
|
||||
exception = true;
|
||||
LOG.error("Exception while adding key: {} in bucket: {}" +
|
||||
" of volume: {}.", key, bucket, volume, e);
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
exception = true;
|
||||
LOG.error("Exception while creating bucket: {}" +
|
||||
" in volume: {}.", bucket, volume, e);
|
||||
}
|
||||
@ -287,11 +331,82 @@ public final class Corona extends Configured implements Tool {
|
||||
* Adds ShutdownHook to print statistics.
|
||||
*/
|
||||
private void addShutdownHook() {
|
||||
Runtime.getRuntime().addShutdownHook(new Thread() {
|
||||
public void run() {
|
||||
printStats(System.out);
|
||||
Runtime.getRuntime().addShutdownHook(
|
||||
new Thread(() -> printStats(System.out)));
|
||||
}
|
||||
|
||||
private Thread getProgressBarThread() {
|
||||
long maxValue = Integer.parseInt(numOfVolumes) *
|
||||
Integer.parseInt(numOfBuckets) *
|
||||
Integer.parseInt(numOfKeys);
|
||||
Thread progressBarThread = new Thread(
|
||||
new ProgressBar(System.out, maxValue));
|
||||
progressBarThread.setName("ProgressBar");
|
||||
return progressBarThread;
|
||||
}
|
||||
|
||||
private class ProgressBar implements Runnable {
|
||||
|
||||
private final long refreshInterval = 1000L;
|
||||
|
||||
private PrintStream stream;
|
||||
private long maxValue;
|
||||
|
||||
ProgressBar(PrintStream stream, long maxValue) {
|
||||
this.stream = stream;
|
||||
this.maxValue = maxValue;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
stream.println();
|
||||
long keys;
|
||||
while((keys = numberOfKeysAdded.get()) < maxValue) {
|
||||
print(keys);
|
||||
if(completed) {
|
||||
break;
|
||||
}
|
||||
Thread.sleep(refreshInterval);
|
||||
}
|
||||
if(exception) {
|
||||
stream.println();
|
||||
stream.println("Incomplete termination, " +
|
||||
"check log for exception.");
|
||||
} else {
|
||||
print(maxValue);
|
||||
}
|
||||
stream.println();
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Given current value prints the progress bar.
|
||||
*
|
||||
* @param currentValue
|
||||
*/
|
||||
private void print(long currentValue) {
|
||||
stream.print('\r');
|
||||
double percent = 100.0 * currentValue / maxValue;
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append(" " + String.format("%.2f", percent) + "% |");
|
||||
|
||||
for (int i = 0; i <= percent; i++) {
|
||||
sb.append('█');
|
||||
}
|
||||
for (int j = 0; j < 100 - percent; j++) {
|
||||
sb.append(' ');
|
||||
}
|
||||
sb.append("| ");
|
||||
sb.append(currentValue + "/" + maxValue);
|
||||
long timeInSec = TimeUnit.SECONDS.convert(
|
||||
System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
|
||||
String timeToPrint = String.format("%d:%02d:%02d", timeInSec / 3600,
|
||||
(timeInSec % 3600) / 60, timeInSec % 60);
|
||||
sb.append(" Time: " + timeToPrint);
|
||||
stream.print(sb);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -300,15 +415,81 @@ public final class Corona extends Configured implements Tool {
|
||||
* @param out PrintStream
|
||||
*/
|
||||
private void printStats(PrintStream out) {
|
||||
long timeInSec = TimeUnit.SECONDS.convert(
|
||||
System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
|
||||
String timeToPrint = timeInSec < 60 ? timeInSec + " seconds" :
|
||||
TimeUnit.MINUTES.convert(timeInSec, TimeUnit.SECONDS) + " minuites";
|
||||
int threadCount = Integer.parseInt(numOfThreads);
|
||||
|
||||
long endTime = System.nanoTime() - startTime;
|
||||
String execTime = String.format("%02d:%02d:%02d",
|
||||
TimeUnit.NANOSECONDS.toHours(endTime),
|
||||
TimeUnit.NANOSECONDS.toMinutes(endTime) -
|
||||
TimeUnit.HOURS.toMinutes(
|
||||
TimeUnit.NANOSECONDS.toHours(endTime)),
|
||||
TimeUnit.NANOSECONDS.toSeconds(endTime) -
|
||||
TimeUnit.MINUTES.toSeconds(
|
||||
TimeUnit.NANOSECONDS.toMinutes(endTime)));
|
||||
|
||||
long volumeTime = volumeCreationTime.longValue();
|
||||
String prettyVolumeTime = String.format("%02d:%02d:%02d:%02d",
|
||||
TimeUnit.NANOSECONDS.toHours(volumeTime),
|
||||
TimeUnit.NANOSECONDS.toMinutes(volumeTime) -
|
||||
TimeUnit.HOURS.toMinutes(
|
||||
TimeUnit.NANOSECONDS.toHours(volumeTime)),
|
||||
TimeUnit.NANOSECONDS.toSeconds(volumeTime) -
|
||||
TimeUnit.MINUTES.toSeconds(
|
||||
TimeUnit.NANOSECONDS.toMinutes(volumeTime)),
|
||||
TimeUnit.NANOSECONDS.toMillis(volumeTime) -
|
||||
TimeUnit.SECONDS.toMillis(
|
||||
TimeUnit.NANOSECONDS.toSeconds(volumeTime)));
|
||||
|
||||
long bucketTime = bucketCreationTime.longValue() / threadCount;
|
||||
String prettyBucketTime = String.format("%02d:%02d:%02d:%02d",
|
||||
TimeUnit.NANOSECONDS.toHours(bucketTime),
|
||||
TimeUnit.NANOSECONDS.toMinutes(bucketTime) -
|
||||
TimeUnit.HOURS.toMinutes(
|
||||
TimeUnit.NANOSECONDS.toHours(bucketTime)),
|
||||
TimeUnit.NANOSECONDS.toSeconds(bucketTime) -
|
||||
TimeUnit.MINUTES.toSeconds(
|
||||
TimeUnit.NANOSECONDS.toMinutes(bucketTime)),
|
||||
TimeUnit.NANOSECONDS.toMillis(bucketTime) -
|
||||
TimeUnit.SECONDS.toMillis(
|
||||
TimeUnit.NANOSECONDS.toSeconds(bucketTime)));
|
||||
|
||||
long totalKeyCreationTime = keyCreationTime.longValue() / threadCount;
|
||||
String prettyKeyCreationTime = String.format("%02d:%02d:%02d:%02d",
|
||||
TimeUnit.NANOSECONDS.toHours(totalKeyCreationTime),
|
||||
TimeUnit.NANOSECONDS.toMinutes(totalKeyCreationTime) -
|
||||
TimeUnit.HOURS.toMinutes(
|
||||
TimeUnit.NANOSECONDS.toHours(totalKeyCreationTime)),
|
||||
TimeUnit.NANOSECONDS.toSeconds(totalKeyCreationTime) -
|
||||
TimeUnit.MINUTES.toSeconds(
|
||||
TimeUnit.NANOSECONDS.toMinutes(totalKeyCreationTime)),
|
||||
TimeUnit.NANOSECONDS.toMillis(totalKeyCreationTime) -
|
||||
TimeUnit.SECONDS.toMillis(
|
||||
TimeUnit.NANOSECONDS.toSeconds(totalKeyCreationTime)));
|
||||
|
||||
long totalKeyWriteTime = keyWriteTime.longValue() / threadCount;
|
||||
String prettyKeyWriteTime = String.format("%02d:%02d:%02d:%02d",
|
||||
TimeUnit.NANOSECONDS.toHours(totalKeyWriteTime),
|
||||
TimeUnit.NANOSECONDS.toMinutes(totalKeyWriteTime) -
|
||||
TimeUnit.HOURS.toMinutes(
|
||||
TimeUnit.NANOSECONDS.toHours(totalKeyWriteTime)),
|
||||
TimeUnit.NANOSECONDS.toSeconds(totalKeyWriteTime) -
|
||||
TimeUnit.MINUTES.toSeconds(
|
||||
TimeUnit.NANOSECONDS.toMinutes(totalKeyWriteTime)),
|
||||
TimeUnit.NANOSECONDS.toMillis(totalKeyWriteTime) -
|
||||
TimeUnit.SECONDS.toMillis(
|
||||
TimeUnit.NANOSECONDS.toSeconds(totalKeyWriteTime)));
|
||||
|
||||
out.println();
|
||||
out.println("***************************************************");
|
||||
out.println("Number of Volumes created: " + numberOfVolumesCreated);
|
||||
out.println("Number of Buckets created: " + numberOfBucketsCreated);
|
||||
out.println("Number of Keys added: " + numberOfKeysAdded);
|
||||
out.println("Execution time: " + timeToPrint);
|
||||
out.println("Time spent in volume creation: " + prettyVolumeTime);
|
||||
out.println("Time spent in bucket creation: " + prettyBucketTime);
|
||||
out.println("Time spent in key creation: " + prettyKeyCreationTime);
|
||||
out.println("Time spent in writing keys: " + prettyKeyWriteTime);
|
||||
out.println("Total bytes written: " + totalBytesWritten);
|
||||
out.println("Total Execution time: " + execTime);
|
||||
out.println("***************************************************");
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user