diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ProgressBar.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ProgressBar.java index a987eeadde..a8d7e73c73 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ProgressBar.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ProgressBar.java @@ -14,134 +14,197 @@ * License for the specific language governing permissions and limitations under * the License. */ - package org.apache.hadoop.ozone.freon; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.PrintStream; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; import java.util.function.Supplier; /** - * Creates and runs a ProgressBar in new Thread which gets printed on - * the provided PrintStream. + * Run an arbitrary code and print progress on the provided stream. The + * progressbar stops when: - the provided currentvalue is less the the maxvalue + * - exception thrown */ public class ProgressBar { - private static final Logger LOG = LoggerFactory.getLogger(ProgressBar.class); private static final long REFRESH_INTERVAL = 1000L; - private final long maxValue; - private final Supplier currentValue; - private final Thread progressBar; - - private volatile boolean running; - - private volatile long startTime; + private PrintStream stream; + private AtomicLong currentValue; + private long maxValue; + private Thread progressBar; + private volatile boolean exception = false; + private long startTime; /** - * Creates a new ProgressBar instance which prints the progress on the given - * PrintStream when started. - * - * @param stream to display the progress + * @param stream Used to display the progress * @param maxValue Maximum value of the progress - * @param currentValue Supplier that provides the current value */ - public ProgressBar(final PrintStream stream, final Long maxValue, - final Supplier currentValue) { + ProgressBar(PrintStream stream, long maxValue) { + this.stream = stream; this.maxValue = maxValue; - this.currentValue = currentValue; - this.progressBar = new Thread(getProgressBar(stream)); - this.running = false; + this.currentValue = new AtomicLong(0); + this.progressBar = new Thread(new ProgressBarThread()); } /** - * Starts the ProgressBar in a new Thread. - * This is a non blocking call. - */ - public synchronized void start() { - if (!running) { - running = true; - startTime = System.nanoTime(); - progressBar.start(); - } - } - - /** - * Graceful shutdown, waits for the progress bar to complete. - * This is a blocking call. - */ - public synchronized void shutdown() { - if (running) { - try { - progressBar.join(); - running = false; - } catch (InterruptedException e) { - LOG.warn("Got interrupted while waiting for the progress bar to " + - "complete."); - } - } - } - - /** - * Terminates the progress bar. This doesn't wait for the progress bar - * to complete. - */ - public synchronized void terminate() { - if (running) { - try { - running = false; - progressBar.join(); - } catch (InterruptedException e) { - LOG.warn("Got interrupted while waiting for the progress bar to " + - "complete."); - } - } - } - - private Runnable getProgressBar(final PrintStream stream) { - return () -> { - stream.println(); - while (running && currentValue.get() < maxValue) { - print(stream, currentValue.get()); - try { - Thread.sleep(REFRESH_INTERVAL); - } catch (InterruptedException e) { - LOG.warn("ProgressBar was interrupted."); - } - } - print(stream, maxValue); - stream.println(); - running = false; - }; - } - - /** - * Given current value prints the progress bar. + * Start a task with a progessbar without any in/out parameters Runnable used + * just a task wrapper. * - * @param value current progress position + * @param task Runnable */ - private void print(final PrintStream stream, final long value) { - stream.print('\r'); - double percent = 100.0 * value / maxValue; - StringBuilder sb = new StringBuilder(); - sb.append(" " + String.format("%.2f", percent) + "% |"); + public void start(Runnable task) { - for (int i = 0; i <= percent; i++) { - sb.append('█'); + startTime = System.nanoTime(); + + try { + + progressBar.start(); + task.run(); + + } catch (Exception e) { + exception = true; + } finally { + + try { + progressBar.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + } } - for (int j = 0; j < 100 - percent; j++) { - sb.append(' '); - } - sb.append("| "); - sb.append(value + "/" + maxValue); - long timeInSec = TimeUnit.SECONDS.convert( - System.nanoTime() - startTime, TimeUnit.NANOSECONDS); - String timeToPrint = String.format("%d:%02d:%02d", timeInSec / 3600, - (timeInSec % 3600) / 60, timeInSec % 60); - sb.append(" Time: " + timeToPrint); - stream.print(sb.toString()); } -} \ No newline at end of file + + /** + * Start a task with only out parameters. + * + * @param task Supplier that represents the task + * @param Generic return type + * @return Whatever the supllier produces + */ + public T start(Supplier task) { + + startTime = System.nanoTime(); + T result = null; + + try { + + progressBar.start(); + result = task.get(); + + } catch (Exception e) { + exception = true; + } finally { + + try { + progressBar.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + return result; + } + } + + /** + * Start a task with in/out parameters. + * + * @param input Input of the function + * @param task A Function that does the task + * @param type of the input + * @param return type + * @return Whatever the Function returns + */ + public R start(T input, Function task) { + + startTime = System.nanoTime(); + R result = null; + + try { + + progressBar.start(); + result = task.apply(input); + + } catch (Exception e) { + exception = true; + throw e; + } finally { + + try { + progressBar.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + return result; + } + } + + /** + * Increment the progress with one step. + */ + public void incrementProgress() { + currentValue.incrementAndGet(); + } + + private class ProgressBarThread implements Runnable { + + @Override + public void run() { + try { + + stream.println(); + long value; + + while ((value = currentValue.get()) < maxValue) { + print(value); + + if (exception) { + break; + } + Thread.sleep(REFRESH_INTERVAL); + } + + if (exception) { + stream.println(); + stream.println("Incomplete termination, " + "check log for " + + "exception."); + } else { + print(maxValue); + } + stream.println(); + } catch (InterruptedException e) { + stream.println(e); + } + } + + /** + * Given current value prints the progress bar. + * + * @param value current progress position + */ + private void print(long value) { + stream.print('\r'); + double percent = 100.0 * value / maxValue; + StringBuilder sb = new StringBuilder(); + sb.append(" " + String.format("%.2f", percent) + "% |"); + + for (int i = 0; i <= percent; i++) { + sb.append('█'); + } + for (int j = 0; j < 100 - percent; j++) { + sb.append(' '); + } + sb.append("| "); + sb.append(value + "/" + maxValue); + long timeInSec = TimeUnit.SECONDS.convert( + System.nanoTime() - startTime, TimeUnit.NANOSECONDS); + String timeToPrint = String.format("%d:%02d:%02d", timeInSec / 3600, + (timeInSec % 3600) / 60, timeInSec % 60); + sb.append(" Time: " + timeToPrint); + stream.print(sb.toString()); + } + } + +} diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java index ebaece28c6..184b075e1c 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java @@ -185,7 +185,6 @@ enum FreonOps { private ArrayList histograms = new ArrayList<>(); private OzoneConfiguration ozoneConfiguration; - private ProgressBar progressbar; RandomKeyGenerator() { } @@ -252,26 +251,13 @@ public Void call() throws Exception { validator.start(); LOG.info("Data validation is enabled."); } - - Supplier currentValue; - long maxValue; - - currentValue = () -> numberOfKeysAdded.get(); - maxValue = numOfVolumes * - numOfBuckets * - numOfKeys; - - progressbar = new ProgressBar(System.out, maxValue, currentValue); - + Thread progressbar = getProgressBarThread(); LOG.info("Starting progress bar Thread."); - progressbar.start(); - processor.shutdown(); processor.awaitTermination(Integer.MAX_VALUE, TimeUnit.MILLISECONDS); - - progressbar.shutdown(); - + completed = true; + progressbar.join(); if (validateWrites) { validator.join(); } @@ -910,6 +896,73 @@ public String[] getTenQuantileKeyWriteTime() { } } + private class ProgressBar implements Runnable { + + private static final long REFRESH_INTERVAL = 1000L; + + private PrintStream stream; + private Supplier currentValue; + private long maxValue; + + ProgressBar(PrintStream stream, Supplier currentValue, + long maxValue) { + this.stream = stream; + this.currentValue = currentValue; + this.maxValue = maxValue; + } + + @Override + public void run() { + try { + stream.println(); + long value; + while ((value = currentValue.get()) < maxValue) { + print(value); + if (completed) { + break; + } + Thread.sleep(REFRESH_INTERVAL); + } + if (exception) { + stream.println(); + stream.println("Incomplete termination, " + + "check log for exception."); + } else { + print(maxValue); + } + stream.println(); + } catch (InterruptedException e) { + } + } + + /** + * Given current value prints the progress bar. + * + * @param value + */ + private void print(long value) { + stream.print('\r'); + double percent = 100.0 * value / maxValue; + StringBuilder sb = new StringBuilder(); + sb.append(" " + String.format("%.2f", percent) + "% |"); + + for (int i = 0; i <= percent; i++) { + sb.append('█'); + } + for (int j = 0; j < 100 - percent; j++) { + sb.append(' '); + } + sb.append("| "); + sb.append(value + "/" + maxValue); + long timeInSec = TimeUnit.SECONDS.convert( + System.nanoTime() - startTime, TimeUnit.NANOSECONDS); + String timeToPrint = String.format("%d:%02d:%02d", timeInSec / 3600, + (timeInSec % 3600) / 60, timeInSec % 60); + sb.append(" Time: " + timeToPrint); + stream.print(sb); + } + } + /** * Validates the write done in ozone cluster. */ diff --git a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestProgressBar.java b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestProgressBar.java index 90366dafd2..cea7be8183 100644 --- a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestProgressBar.java +++ b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestProgressBar.java @@ -22,15 +22,12 @@ import org.mockito.junit.MockitoJUnitRunner; import java.io.PrintStream; -import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; import java.util.function.Supplier; -import java.util.stream.LongStream; +import java.util.stream.IntStream; import static org.mockito.Mockito.*; -/** - * Using Mockito runner. - */ @RunWith(MockitoJUnitRunner.class) /** * Tests for the Progressbar class for Freon. @@ -38,36 +35,78 @@ public class TestProgressBar { private PrintStream stream; - private AtomicLong numberOfKeysAdded; - private Supplier currentValue; @Before public void setupMock() { - numberOfKeysAdded = new AtomicLong(0L); - currentValue = () -> numberOfKeysAdded.get(); stream = mock(PrintStream.class); } @Test public void testWithRunnable() { - Long maxValue = 10L; + int maxValue = 10; - ProgressBar progressbar = new ProgressBar(stream, maxValue, currentValue); + ProgressBar progressbar = new ProgressBar(stream, maxValue); Runnable task = () -> { - LongStream.range(0, maxValue).forEach( + IntStream.range(0, maxValue).forEach( counter -> { - numberOfKeysAdded.getAndIncrement(); + progressbar.incrementProgress(); } ); }; - progressbar.start(); - task.run(); - progressbar.shutdown(); + progressbar.start(task); verify(stream, atLeastOnce()).print(anyChar()); verify(stream, atLeastOnce()).print(anyString()); } + + @Test + public void testWithSupplier() { + + int maxValue = 10; + + ProgressBar progressbar = new ProgressBar(stream, maxValue); + + Supplier tasks = () -> { + IntStream.range(0, maxValue).forEach( + counter -> { + progressbar.incrementProgress(); + } + ); + + return 1L; //return the result of the dummy task + }; + + progressbar.start(tasks); + + verify(stream, atLeastOnce()).print(anyChar()); + verify(stream, atLeastOnce()).print(anyString()); + } + + @Test + public void testWithFunction() { + + int maxValue = 10; + Long result; + + ProgressBar progressbar = new ProgressBar(stream, maxValue); + + Function task = (Long l) -> { + IntStream.range(0, maxValue).forEach( + counter -> { + progressbar.incrementProgress(); + } + ); + + return "dummy result"; //return the result of the dummy task + }; + + progressbar.start(1L, task); + + verify(stream, atLeastOnce()).print(anyChar()); + verify(stream, atLeastOnce()).print(anyString()); + } + }