diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java index 2bde3ad601..e329d5a23a 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java @@ -21,6 +21,7 @@ package org.apache.hadoop.ozone.om.ratis; import java.io.IOException; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import com.google.common.annotations.VisibleForTesting; @@ -64,7 +65,7 @@ public class OzoneManagerDoubleBuffer { private final OMMetadataManager omMetadataManager; private final AtomicLong flushedTransactionCount = new AtomicLong(0); private final AtomicLong flushIterations = new AtomicLong(0); - private volatile boolean isRunning; + private final AtomicBoolean isRunning = new AtomicBoolean(false); private OzoneManagerDoubleBufferMetrics ozoneManagerDoubleBufferMetrics; private long maxFlushedTransactionsInOneIteration; @@ -79,7 +80,7 @@ public class OzoneManagerDoubleBuffer { this.ozoneManagerDoubleBufferMetrics = OzoneManagerDoubleBufferMetrics.create(); - isRunning = true; + isRunning.set(true); // Daemon thread which runs in back ground and flushes transactions to DB. daemon = new Daemon(this::flushTransactions); daemon.setName("OMDoubleBufferFlushThread"); @@ -92,7 +93,7 @@ public class OzoneManagerDoubleBuffer { * and commit to DB. */ private void flushTransactions() { - while(isRunning) { + while (isRunning.get()) { try { if (canFlush()) { setReadyBuffer(); @@ -140,7 +141,7 @@ public class OzoneManagerDoubleBuffer { } } catch (InterruptedException ex) { Thread.currentThread().interrupt(); - if (isRunning) { + if (isRunning.get()) { final String message = "OMDoubleBuffer flush thread " + Thread.currentThread().getName() + " encountered Interrupted " + "exception while running"; @@ -201,11 +202,16 @@ public class OzoneManagerDoubleBuffer { /** * Stop OM DoubleBuffer flush thread. */ - public synchronized void stop() { - if (isRunning) { + public void stop() { + if (isRunning.compareAndSet(true, false)) { LOG.info("Stopping OMDoubleBuffer flush thread"); - isRunning = false; daemon.interrupt(); + try { + // Wait for daemon thread to exit + daemon.join(); + } catch (InterruptedException e) { + LOG.error("Interrupted while waiting for daemon to exit."); + } // stop metrics. ozoneManagerDoubleBufferMetrics.unRegister();