diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java index 6e1e02ccdb..5198ac3e7d 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java @@ -25,9 +25,11 @@ import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.HashMap; +import java.util.Map; import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -95,8 +97,6 @@ enum FreonOps { KEY_WRITE } - private static final String RATIS = "ratis"; - private static final String DURATION_FORMAT = "HH:mm:ss,SSS"; private static final int QUANTILES = 10; @@ -112,8 +112,8 @@ enum FreonOps { private static final Logger LOG = LoggerFactory.getLogger(RandomKeyGenerator.class); - private boolean completed = false; - private Exception exception = null; + private volatile boolean completed = false; + private volatile Exception exception = null; @Option(names = "--numOfThreads", description = "number of threads to be launched for the run", @@ -193,6 +193,14 @@ enum FreonOps { private AtomicLong totalBytesWritten; + private int totalBucketCount; + private long totalKeyCount; + private AtomicInteger volumeCounter; + private AtomicInteger bucketCounter; + private AtomicLong keyCounter; + private Map volumes; + private Map buckets; + private AtomicInteger numberOfVolumesCreated; private AtomicInteger numberOfBucketsCreated; private AtomicLong numberOfKeysAdded; @@ -226,6 +234,11 @@ public void init(OzoneConfiguration configuration) throws IOException { numberOfVolumesCreated = new AtomicInteger(); numberOfBucketsCreated = new AtomicInteger(); numberOfKeysAdded = new AtomicLong(); + volumeCounter = new AtomicInteger(); + bucketCounter = new AtomicInteger(); + keyCounter = new AtomicLong(); + volumes = new ConcurrentHashMap<>(); + buckets = new ConcurrentHashMap<>(); ozoneClient = OzoneClientFactory.getClient(configuration); objectStore = ozoneClient.getObjectStore(); for (FreonOps ops : FreonOps.values()) { @@ -259,6 +272,9 @@ public Void call() throws Exception { } } + totalBucketCount = numOfVolumes * numOfBuckets; + totalKeyCount = totalBucketCount * numOfKeys; + LOG.info("Number of Threads: " + numOfThreads); threadPoolSize = numOfThreads; executor = Executors.newFixedThreadPool(threadPoolSize); @@ -269,9 +285,8 @@ public Void call() throws Exception { LOG.info("Number of Keys per Bucket: {}.", numOfKeys); LOG.info("Key size: {} bytes", keySize); LOG.info("Buffer size: {} bytes", bufferSize); - for (int i = 0; i < numOfVolumes; i++) { - String volumeName = "vol-" + i + "-" + RandomStringUtils.randomNumeric(5); - executor.submit(new VolumeProcessor(volumeName)); + for (int i = 0; i < numOfThreads; i++) { + executor.submit(new ObjectCreator()); } Thread validator = null; @@ -286,22 +301,15 @@ public Void call() throws Exception { LOG.info("Data validation is enabled."); } - Supplier currentValue; - long maxValue; - - currentValue = () -> numberOfKeysAdded.get(); - maxValue = numOfVolumes * - numOfBuckets * - numOfKeys; - - progressbar = new ProgressBar(System.out, maxValue, currentValue); + Supplier currentValue = numberOfKeysAdded::get; + progressbar = new ProgressBar(System.out, totalKeyCount, currentValue); LOG.info("Starting progress bar Thread."); progressbar.start(); // wait until all keys are added or exception occurred. - while ((numberOfKeysAdded.get() != numOfVolumes * numOfBuckets * numOfKeys) + while ((numberOfKeysAdded.get() != totalKeyCount) && exception == null) { try { Thread.sleep(CHECK_INTERVAL_MILLIS); @@ -570,7 +578,7 @@ private static class KeyValidate { * * @param bucket bucket part * @param keyName key part - * @param keyName digest of this key's full value + * @param digest digest of this key's full value */ KeyValidate(OzoneBucket bucket, String keyName, byte[] digest) { this.bucket = bucket; @@ -579,146 +587,178 @@ private static class KeyValidate { } } - private class VolumeProcessor implements Runnable { - private String volumeName; - - VolumeProcessor(String volumeName) { - this.volumeName = volumeName; - } - + private class ObjectCreator implements Runnable { @Override - @SuppressFBWarnings("REC_CATCH_EXCEPTION") public void run() { - LOG.trace("Creating volume: {}", volumeName); - long start = System.nanoTime(); - OzoneVolume volume; - try (Scope scope = GlobalTracer.get().buildSpan("createVolume") - .startActive(true)) { - objectStore.createVolume(volumeName); - long volumeCreationDuration = System.nanoTime() - start; - volumeCreationTime.getAndAdd(volumeCreationDuration); - histograms.get(FreonOps.VOLUME_CREATE.ordinal()) - .update(volumeCreationDuration); - numberOfVolumesCreated.getAndIncrement(); - volume = objectStore.getVolume(volumeName); - } catch (IOException e) { - exception = e; - LOG.error("Could not create volume", e); - return; + int v; + while ((v = volumeCounter.getAndIncrement()) < numOfVolumes) { + if (!createVolume(v)) { + return; + } } - for (int i = 0; i < numOfBuckets; i++) { - String bucketName = "bucket-" + i + "-" + - RandomStringUtils.randomNumeric(5); - BucketProcessor bp = new BucketProcessor(volume, bucketName); - executor.submit(bp); + int b; + while ((b = bucketCounter.getAndIncrement()) < totalBucketCount) { + if (!createBucket(b)) { + return; + } + } + + long k; + while ((k = keyCounter.getAndIncrement()) < totalKeyCount) { + if (!createKey(k)) { + return; + } } } } - private class BucketProcessor implements Runnable { - private OzoneVolume volume; - private String bucketName; - - BucketProcessor(OzoneVolume volume, String bucketName) { - this.volume = volume; - this.bucketName = bucketName; - } - - @Override - @SuppressFBWarnings("REC_CATCH_EXCEPTION") - public void run() { - LOG.trace("Creating bucket: {} in volume: {}", - bucketName, volume.getName()); + private boolean createVolume(int volumeNumber) { + String volumeName = "vol-" + volumeNumber + "-" + + RandomStringUtils.randomNumeric(5); + LOG.trace("Creating volume: {}", volumeName); + try (Scope ignored = GlobalTracer.get().buildSpan("createVolume") + .startActive(true)) { long start = System.nanoTime(); - OzoneBucket bucket; - try (Scope scope = GlobalTracer.get().buildSpan("createBucket") - .startActive(true)) { - volume.createBucket(bucketName); - long bucketCreationDuration = System.nanoTime() - start; - histograms.get(FreonOps.BUCKET_CREATE.ordinal()) - .update(bucketCreationDuration); - bucketCreationTime.getAndAdd(bucketCreationDuration); - numberOfBucketsCreated.getAndIncrement(); + objectStore.createVolume(volumeName); + long volumeCreationDuration = System.nanoTime() - start; + volumeCreationTime.getAndAdd(volumeCreationDuration); + histograms.get(FreonOps.VOLUME_CREATE.ordinal()) + .update(volumeCreationDuration); + numberOfVolumesCreated.getAndIncrement(); - bucket = volume.getBucket(bucketName); - } catch (IOException e) { - exception = e; - LOG.error("Could not create bucket ", e); - return; - } - - for (int i = 0; i < numOfKeys; i++) { - String keyName = "key-" + i + "-" + RandomStringUtils.randomNumeric(5); - KeyProcessor kp = new KeyProcessor(bucket, keyName); - executor.submit(kp); - } + OzoneVolume volume = objectStore.getVolume(volumeName); + volumes.put(volumeNumber, volume); + return true; + } catch (IOException e) { + exception = e; + LOG.error("Could not create volume", e); + return false; } } - private class KeyProcessor implements Runnable { - private OzoneBucket bucket; - private String keyName; - - KeyProcessor(OzoneBucket bucket, String keyName) { - this.bucket = bucket; - this.keyName = keyName; + private boolean createBucket(int globalBucketNumber) { + int volumeNumber = globalBucketNumber % numOfVolumes; + int bucketNumber = globalBucketNumber / numOfVolumes; + OzoneVolume volume = getVolume(volumeNumber); + if (volume == null) { + return false; } + String bucketName = "bucket-" + bucketNumber + "-" + + RandomStringUtils.randomNumeric(5); + LOG.trace("Creating bucket: {} in volume: {}", + bucketName, volume.getName()); + try (Scope ignored = GlobalTracer.get().buildSpan("createBucket") + .startActive(true)) { + long start = System.nanoTime(); + volume.createBucket(bucketName); + long bucketCreationDuration = System.nanoTime() - start; + histograms.get(FreonOps.BUCKET_CREATE.ordinal()) + .update(bucketCreationDuration); + bucketCreationTime.getAndAdd(bucketCreationDuration); + numberOfBucketsCreated.getAndIncrement(); - @Override - @SuppressFBWarnings("REC_CATCH_EXCEPTION") - public void run() { - String bucketName = bucket.getName(); - String volumeName = bucket.getVolumeName(); - LOG.trace("Adding key: {} in bucket: {} of volume: {}", - keyName, bucketName, volumeName); - byte[] randomValue = DFSUtil.string2Bytes(UUID.randomUUID().toString()); - try { + OzoneBucket bucket = volume.getBucket(bucketName); + buckets.put(globalBucketNumber, bucket); + return true; + } catch (IOException e) { + exception = e; + LOG.error("Could not create bucket ", e); + return false; + } + } + + @SuppressFBWarnings("REC_CATCH_EXCEPTION") + private boolean createKey(long globalKeyNumber) { + int globalBucketNumber = (int) (globalKeyNumber % totalBucketCount); + long keyNumber = globalKeyNumber / totalBucketCount; + OzoneBucket bucket = getBucket(globalBucketNumber); + if (bucket == null) { + return false; + } + String bucketName = bucket.getName(); + String volumeName = bucket.getVolumeName(); + String keyName = "key-" + keyNumber + "-" + + RandomStringUtils.randomNumeric(5); + LOG.trace("Adding key: {} in bucket: {} of volume: {}", + keyName, bucketName, volumeName); + byte[] randomValue = DFSUtil.string2Bytes(UUID.randomUUID().toString()); + try { + try (Scope scope = GlobalTracer.get().buildSpan("createKey") + .startActive(true)) { long keyCreateStart = System.nanoTime(); - try (Scope scope = GlobalTracer.get().buildSpan("createKey") + OzoneOutputStream os = bucket.createKey(keyName, keySize, type, + factor, new HashMap<>()); + long keyCreationDuration = System.nanoTime() - keyCreateStart; + histograms.get(FreonOps.KEY_CREATE.ordinal()) + .update(keyCreationDuration); + keyCreationTime.getAndAdd(keyCreationDuration); + + try (Scope writeScope = GlobalTracer.get().buildSpan("writeKeyData") .startActive(true)) { - OzoneOutputStream os = bucket.createKey(keyName, keySize, type, - factor, new HashMap<>()); - long keyCreationDuration = System.nanoTime() - keyCreateStart; - histograms.get(FreonOps.KEY_CREATE.ordinal()) - .update(keyCreationDuration); - keyCreationTime.getAndAdd(keyCreationDuration); - long keyWriteStart = System.nanoTime(); - try (Scope writeScope = GlobalTracer.get().buildSpan("writeKeyData") - .startActive(true)) { - for (long nrRemaining = keySize - randomValue.length; - nrRemaining > 0; nrRemaining -= bufferSize) { - int curSize = (int)Math.min(bufferSize, nrRemaining); - os.write(keyValueBuffer, 0, curSize); - } - os.write(randomValue); - os.close(); - - long keyWriteDuration = System.nanoTime() - keyWriteStart; - histograms.get(FreonOps.KEY_WRITE.ordinal()) - .update(keyWriteDuration); - keyWriteTime.getAndAdd(keyWriteDuration); - totalBytesWritten.getAndAdd(keySize); - numberOfKeysAdded.getAndIncrement(); + for (long nrRemaining = keySize - randomValue.length; + nrRemaining > 0; nrRemaining -= bufferSize) { + int curSize = (int) Math.min(bufferSize, nrRemaining); + os.write(keyValueBuffer, 0, curSize); } - } + os.write(randomValue); + os.close(); - if (validateWrites) { - MessageDigest tmpMD = (MessageDigest)commonInitialMD.clone(); - tmpMD.update(randomValue); - boolean validate = validationQueue.offer( - new KeyValidate(bucket, keyName, tmpMD.digest())); - if (validate) { - LOG.trace("Key {}, is queued for validation.", keyName); - } + long keyWriteDuration = System.nanoTime() - keyWriteStart; + histograms.get(FreonOps.KEY_WRITE.ordinal()) + .update(keyWriteDuration); + keyWriteTime.getAndAdd(keyWriteDuration); + totalBytesWritten.getAndAdd(keySize); + numberOfKeysAdded.getAndIncrement(); } - } catch (Exception e) { - exception = e; - LOG.error("Exception while adding key: {} in bucket: {}" + - " of volume: {}.", keyName, bucketName, volumeName, e); + } + + if (validateWrites) { + MessageDigest tmpMD = (MessageDigest) commonInitialMD.clone(); + tmpMD.update(randomValue); + boolean validate = validationQueue.offer( + new KeyValidate(bucket, keyName, tmpMD.digest())); + if (validate) { + LOG.trace("Key {} is queued for validation.", keyName); + } + } + + return true; + } catch (Exception e) { + exception = e; + LOG.error("Exception while adding key: {} in bucket: {}" + + " of volume: {}.", keyName, bucketName, volumeName, e); + return false; + } + } + + private OzoneVolume getVolume(Integer volumeNumber) { + return waitUntilAddedToMap(volumes, volumeNumber); + } + + private OzoneBucket getBucket(Integer bucketNumber) { + return waitUntilAddedToMap(buckets, bucketNumber); + } + + /** + * Looks up volume or bucket from the cache. Waits for it to be created if + * needed (can happen for the last few items depending on the number of + * threads). + * + * @return may return null if this thread is interrupted, or if any other + * thread encounters an exception (and stores it to {@code exception}) + */ + private T waitUntilAddedToMap(Map map, Integer i) { + while (exception == null && !map.containsKey(i)) { + try { + Thread.sleep(10); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return null; } } + return map.get(i); } private final class FreonJobInfo {