diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ProgressBar.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ProgressBar.java index a8d7e73c73..a987eeadde 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ProgressBar.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ProgressBar.java @@ -14,197 +14,134 @@ * License for the specific language governing permissions and limitations under * the License. */ + package org.apache.hadoop.ozone.freon; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.PrintStream; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Function; import java.util.function.Supplier; /** - * Run an arbitrary code and print progress on the provided stream. The - * progressbar stops when: - the provided currentvalue is less the the maxvalue - * - exception thrown + * Creates and runs a ProgressBar in new Thread which gets printed on + * the provided PrintStream. */ public class ProgressBar { + private static final Logger LOG = LoggerFactory.getLogger(ProgressBar.class); private static final long REFRESH_INTERVAL = 1000L; - private PrintStream stream; - private AtomicLong currentValue; - private long maxValue; - private Thread progressBar; - private volatile boolean exception = false; - private long startTime; + private final long maxValue; + private final Supplier currentValue; + private final Thread progressBar; + + private volatile boolean running; + + private volatile long startTime; /** - * @param stream Used to display the progress + * Creates a new ProgressBar instance which prints the progress on the given + * PrintStream when started. + * + * @param stream to display the progress * @param maxValue Maximum value of the progress + * @param currentValue Supplier that provides the current value */ - ProgressBar(PrintStream stream, long maxValue) { - this.stream = stream; + public ProgressBar(final PrintStream stream, final Long maxValue, + final Supplier currentValue) { this.maxValue = maxValue; - this.currentValue = new AtomicLong(0); - this.progressBar = new Thread(new ProgressBarThread()); + this.currentValue = currentValue; + this.progressBar = new Thread(getProgressBar(stream)); + this.running = false; } /** - * Start a task with a progessbar without any in/out parameters Runnable used - * just a task wrapper. - * - * @param task Runnable + * Starts the ProgressBar in a new Thread. + * This is a non blocking call. */ - public void start(Runnable task) { - - startTime = System.nanoTime(); - - try { - + public synchronized void start() { + if (!running) { + running = true; + startTime = System.nanoTime(); progressBar.start(); - task.run(); - - } catch (Exception e) { - exception = true; - } finally { + } + } + /** + * Graceful shutdown, waits for the progress bar to complete. + * This is a blocking call. + */ + public synchronized void shutdown() { + if (running) { try { progressBar.join(); + running = false; } catch (InterruptedException e) { - e.printStackTrace(); + LOG.warn("Got interrupted while waiting for the progress bar to " + + "complete."); } } } /** - * Start a task with only out parameters. - * - * @param task Supplier that represents the task - * @param Generic return type - * @return Whatever the supllier produces + * Terminates the progress bar. This doesn't wait for the progress bar + * to complete. */ - public T start(Supplier task) { - - startTime = System.nanoTime(); - T result = null; - - try { - - progressBar.start(); - result = task.get(); - - } catch (Exception e) { - exception = true; - } finally { - + public synchronized void terminate() { + if (running) { try { + running = false; progressBar.join(); } catch (InterruptedException e) { - e.printStackTrace(); + LOG.warn("Got interrupted while waiting for the progress bar to " + + "complete."); } - - return result; } } - /** - * Start a task with in/out parameters. - * - * @param input Input of the function - * @param task A Function that does the task - * @param type of the input - * @param return type - * @return Whatever the Function returns - */ - public R start(T input, Function task) { - - startTime = System.nanoTime(); - R result = null; - - try { - - progressBar.start(); - result = task.apply(input); - - } catch (Exception e) { - exception = true; - throw e; - } finally { - - try { - progressBar.join(); - } catch (InterruptedException e) { - e.printStackTrace(); - } - - return result; - } - } - - /** - * Increment the progress with one step. - */ - public void incrementProgress() { - currentValue.incrementAndGet(); - } - - private class ProgressBarThread implements Runnable { - - @Override - public void run() { - try { - - stream.println(); - long value; - - while ((value = currentValue.get()) < maxValue) { - print(value); - - if (exception) { - break; - } + private Runnable getProgressBar(final PrintStream stream) { + return () -> { + stream.println(); + while (running && currentValue.get() < maxValue) { + print(stream, currentValue.get()); + try { Thread.sleep(REFRESH_INTERVAL); + } catch (InterruptedException e) { + LOG.warn("ProgressBar was interrupted."); } - - if (exception) { - stream.println(); - stream.println("Incomplete termination, " + "check log for " + - "exception."); - } else { - print(maxValue); - } - stream.println(); - } catch (InterruptedException e) { - stream.println(e); } - } - - /** - * Given current value prints the progress bar. - * - * @param value current progress position - */ - private void print(long value) { - stream.print('\r'); - double percent = 100.0 * value / maxValue; - StringBuilder sb = new StringBuilder(); - sb.append(" " + String.format("%.2f", percent) + "% |"); - - for (int i = 0; i <= percent; i++) { - sb.append('█'); - } - for (int j = 0; j < 100 - percent; j++) { - sb.append(' '); - } - sb.append("| "); - sb.append(value + "/" + maxValue); - long timeInSec = TimeUnit.SECONDS.convert( - System.nanoTime() - startTime, TimeUnit.NANOSECONDS); - String timeToPrint = String.format("%d:%02d:%02d", timeInSec / 3600, - (timeInSec % 3600) / 60, timeInSec % 60); - sb.append(" Time: " + timeToPrint); - stream.print(sb.toString()); - } + print(stream, maxValue); + stream.println(); + running = false; + }; } -} + /** + * Given current value prints the progress bar. + * + * @param value current progress position + */ + private void print(final PrintStream stream, final long value) { + stream.print('\r'); + double percent = 100.0 * value / maxValue; + StringBuilder sb = new StringBuilder(); + sb.append(" " + String.format("%.2f", percent) + "% |"); + + for (int i = 0; i <= percent; i++) { + sb.append('█'); + } + for (int j = 0; j < 100 - percent; j++) { + sb.append(' '); + } + sb.append("| "); + sb.append(value + "/" + maxValue); + long timeInSec = TimeUnit.SECONDS.convert( + System.nanoTime() - startTime, TimeUnit.NANOSECONDS); + String timeToPrint = String.format("%d:%02d:%02d", timeInSec / 3600, + (timeInSec % 3600) / 60, timeInSec % 60); + sb.append(" Time: " + timeToPrint); + stream.print(sb.toString()); + } +} \ No newline at end of file diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java index 184b075e1c..5d72f5577e 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java @@ -185,6 +185,7 @@ enum FreonOps { private ArrayList histograms = new ArrayList<>(); private OzoneConfiguration ozoneConfiguration; + private ProgressBar progressbar; RandomKeyGenerator() { } @@ -251,13 +252,26 @@ public Void call() throws Exception { validator.start(); LOG.info("Data validation is enabled."); } - Thread progressbar = getProgressBarThread(); + + Supplier currentValue; + long maxValue; + + currentValue = () -> numberOfKeysAdded.get(); + maxValue = numOfVolumes * + numOfBuckets * + numOfKeys; + + progressbar = new ProgressBar(System.out, maxValue, currentValue); + LOG.info("Starting progress bar Thread."); + progressbar.start(); + processor.shutdown(); processor.awaitTermination(Integer.MAX_VALUE, TimeUnit.MILLISECONDS); - completed = true; - progressbar.join(); + + progressbar.shutdown(); + if (validateWrites) { validator.join(); } @@ -280,22 +294,6 @@ private void addShutdownHook() { Runtime.getRuntime().addShutdownHook( new Thread(() -> printStats(System.out))); } - - private Thread getProgressBarThread() { - Supplier currentValue; - long maxValue; - - currentValue = () -> numberOfKeysAdded.get(); - maxValue = (long) numOfVolumes * - numOfBuckets * - numOfKeys; - - Thread progressBarThread = new Thread( - new ProgressBar(System.out, currentValue, maxValue)); - progressBarThread.setName("ProgressBar"); - return progressBarThread; - } - /** * Prints stats of {@link Freon} run to the PrintStream. * @@ -896,73 +894,6 @@ public String[] getTenQuantileKeyWriteTime() { } } - private class ProgressBar implements Runnable { - - private static final long REFRESH_INTERVAL = 1000L; - - private PrintStream stream; - private Supplier currentValue; - private long maxValue; - - ProgressBar(PrintStream stream, Supplier currentValue, - long maxValue) { - this.stream = stream; - this.currentValue = currentValue; - this.maxValue = maxValue; - } - - @Override - public void run() { - try { - stream.println(); - long value; - while ((value = currentValue.get()) < maxValue) { - print(value); - if (completed) { - break; - } - Thread.sleep(REFRESH_INTERVAL); - } - if (exception) { - stream.println(); - stream.println("Incomplete termination, " + - "check log for exception."); - } else { - print(maxValue); - } - stream.println(); - } catch (InterruptedException e) { - } - } - - /** - * Given current value prints the progress bar. - * - * @param value - */ - private void print(long value) { - stream.print('\r'); - double percent = 100.0 * value / maxValue; - StringBuilder sb = new StringBuilder(); - sb.append(" " + String.format("%.2f", percent) + "% |"); - - for (int i = 0; i <= percent; i++) { - sb.append('█'); - } - for (int j = 0; j < 100 - percent; j++) { - sb.append(' '); - } - sb.append("| "); - sb.append(value + "/" + maxValue); - long timeInSec = TimeUnit.SECONDS.convert( - System.nanoTime() - startTime, TimeUnit.NANOSECONDS); - String timeToPrint = String.format("%d:%02d:%02d", timeInSec / 3600, - (timeInSec % 3600) / 60, timeInSec % 60); - sb.append(" Time: " + timeToPrint); - stream.print(sb); - } - } - /** * Validates the write done in ozone cluster. */ diff --git a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestProgressBar.java b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestProgressBar.java index cea7be8183..90366dafd2 100644 --- a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestProgressBar.java +++ b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestProgressBar.java @@ -22,12 +22,15 @@ import org.mockito.junit.MockitoJUnitRunner; import java.io.PrintStream; -import java.util.function.Function; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; -import java.util.stream.IntStream; +import java.util.stream.LongStream; import static org.mockito.Mockito.*; +/** + * Using Mockito runner. + */ @RunWith(MockitoJUnitRunner.class) /** * Tests for the Progressbar class for Freon. @@ -35,78 +38,36 @@ public class TestProgressBar { private PrintStream stream; + private AtomicLong numberOfKeysAdded; + private Supplier currentValue; @Before public void setupMock() { + numberOfKeysAdded = new AtomicLong(0L); + currentValue = () -> numberOfKeysAdded.get(); stream = mock(PrintStream.class); } @Test public void testWithRunnable() { - int maxValue = 10; + Long maxValue = 10L; - ProgressBar progressbar = new ProgressBar(stream, maxValue); + ProgressBar progressbar = new ProgressBar(stream, maxValue, currentValue); Runnable task = () -> { - IntStream.range(0, maxValue).forEach( + LongStream.range(0, maxValue).forEach( counter -> { - progressbar.incrementProgress(); + numberOfKeysAdded.getAndIncrement(); } ); }; - progressbar.start(task); + progressbar.start(); + task.run(); + progressbar.shutdown(); verify(stream, atLeastOnce()).print(anyChar()); verify(stream, atLeastOnce()).print(anyString()); } - - @Test - public void testWithSupplier() { - - int maxValue = 10; - - ProgressBar progressbar = new ProgressBar(stream, maxValue); - - Supplier tasks = () -> { - IntStream.range(0, maxValue).forEach( - counter -> { - progressbar.incrementProgress(); - } - ); - - return 1L; //return the result of the dummy task - }; - - progressbar.start(tasks); - - verify(stream, atLeastOnce()).print(anyChar()); - verify(stream, atLeastOnce()).print(anyString()); - } - - @Test - public void testWithFunction() { - - int maxValue = 10; - Long result; - - ProgressBar progressbar = new ProgressBar(stream, maxValue); - - Function task = (Long l) -> { - IntStream.range(0, maxValue).forEach( - counter -> { - progressbar.incrementProgress(); - } - ); - - return "dummy result"; //return the result of the dummy task - }; - - progressbar.start(1L, task); - - verify(stream, atLeastOnce()).print(anyChar()); - verify(stream, atLeastOnce()).print(anyString()); - } - }