diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java index 1bd5a70deb..d9eb0b9c44 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java @@ -19,11 +19,16 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.protobuf.ServiceException; import java.io.IOException; import java.util.Collection; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + import org.apache.hadoop.ozone.container.common.transport.server.ratis .ContainerStateMachine; import org.apache.hadoop.ozone.om.OzoneManager; @@ -38,6 +43,7 @@ import org.apache.hadoop.ozone.protocolPB.OzoneManagerHARequestHandler; import org.apache.hadoop.ozone.protocolPB.OzoneManagerHARequestHandlerImpl; import org.apache.hadoop.util.Time; +import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.apache.ratis.proto.RaftProtos; import org.apache.ratis.protocol.Message; import org.apache.ratis.protocol.RaftClientRequest; @@ -70,6 +76,7 @@ public class OzoneManagerStateMachine extends BaseStateMachine { private RaftGroupId raftGroupId; private long lastAppliedIndex = 0; private final OzoneManagerDoubleBuffer ozoneManagerDoubleBuffer; + private final ExecutorService executorService; public OzoneManagerStateMachine(OzoneManagerRatisServer ratisServer) { this.omRatisServer = ratisServer; @@ -79,6 +86,9 @@ public OzoneManagerStateMachine(OzoneManagerRatisServer ratisServer) { this::updateLastAppliedIndex); this.handler = new OzoneManagerHARequestHandlerImpl(ozoneManager, ozoneManagerDoubleBuffer); + ThreadFactory build = new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("OM StateMachine ApplyTransaction Thread - %d").build(); + this.executorService = HadoopExecutors.newSingleThreadExecutor(build); } /** @@ -132,8 +142,36 @@ public CompletableFuture applyTransaction(TransactionContext trx) { OMRequest request = OMRatisHelper.convertByteStringToOMRequest( trx.getStateMachineLogEntry().getLogData()); long trxLogIndex = trx.getLogEntry().getIndex(); - CompletableFuture future = CompletableFuture - .supplyAsync(() -> runCommand(request, trxLogIndex)); + // In the current approach we have one single global thread executor. + // with single thread. Right now this is being done for correctness, as + // applyTransaction will be run on multiple OM's we want to execute the + // transactions in the same order on all OM's, otherwise there is a + // chance that OM replica's can be out of sync. + // TODO: In this way we are making all applyTransactions in + // OM serial order. Revisit this in future to use multiple executors for + // volume/bucket. + + // Reason for not immediately implementing executor per volume is, if + // one executor operations are slow, we cannot update the + // lastAppliedIndex in OzoneManager StateMachine, even if other + // executor has completed the transactions with id more. + + // We have 300 transactions, And for each volume we have transactions + // of 150. Volume1 transactions 0 - 149 and Volume2 transactions 150 - + // 299. + // Example: Executor1 - Volume1 - 100 (current completed transaction) + // Example: Executor2 - Volume2 - 299 (current completed transaction) + + // Now we have applied transactions of 0 - 100 and 149 - 299. We + // cannot update lastAppliedIndex to 299. We need to update it to 100, + // since 101 - 149 are not applied. When OM restarts it will + // applyTransactions from lastAppliedIndex. + // We can update the lastAppliedIndex to 100, and update it to 299, + // only after completing 101 - 149. In initial stage, we are starting + // with single global executor. Will revisit this when needed. + + CompletableFuture future = CompletableFuture.supplyAsync( + () -> runCommand(request, trxLogIndex), executorService); return future; } catch (IOException e) { return completeExceptionally(e); @@ -301,6 +339,7 @@ public void setRaftGroupId(RaftGroupId raftGroupId) { public void stop() { ozoneManagerDoubleBuffer.stop(); + HadoopExecutors.shutdown(executorService, LOG, 5, TimeUnit.SECONDS); } }