diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index f87a32d9e5..42a4d997a8 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -140,7 +140,6 @@ public class ContainerStateMachine extends BaseStateMachine { private ExecutorService[] executors; private final int numExecutors; private final Map applyTransactionCompletionMap; - private long lastIndex; private final Cache stateMachineDataCache; private final boolean isBlockTokenEnabled; private final TokenVerifier tokenVerifier; @@ -163,7 +162,6 @@ public ContainerStateMachine(RaftGroupId gid, ContainerDispatcher dispatcher, this.executors = executors.toArray(new ExecutorService[numExecutors]); this.writeChunkFutureMap = new ConcurrentHashMap<>(); applyTransactionCompletionMap = new ConcurrentHashMap<>(); - this.lastIndex = RaftServerConstants.INVALID_LOG_INDEX; stateMachineDataCache = CacheBuilder.newBuilder() .expireAfterAccess(expiryInterval, TimeUnit.MILLISECONDS) // set the limit on no of cached entries equal to no of max threads @@ -202,7 +200,6 @@ private long loadSnapshot(SingleFileSnapshotInfo snapshot) "The snapshot info is null." + "Setting the last applied index to:" + empty); setLastAppliedTermIndex(empty); - lastIndex = RaftServerConstants.INVALID_LOG_INDEX; return RaftServerConstants.INVALID_LOG_INDEX; } @@ -211,7 +208,6 @@ private long loadSnapshot(SingleFileSnapshotInfo snapshot) SimpleStateMachineStorage.getTermIndexFromSnapshotFile(snapshotFile); LOG.info("Setting the last applied index to " + last); setLastAppliedTermIndex(last); - lastIndex = last.getIndex(); // initialize the dispatcher with snapshot so that it build the missing // container list @@ -572,6 +568,18 @@ private void updateLastApplied() { } } + /** + * Notifies the state machine about index updates because of entries + * which do not cause state machine update, i.e. conf entries, metadata + * entries + * @param term term of the log entry + * @param index index of the log entry + */ + @Override + public void notifyIndexUpdate(long term, long index) { + applyTransactionCompletionMap.put(index, term); + } + /* * ApplyTransaction calls in Ratis are sequential. */ @@ -583,14 +591,6 @@ public CompletableFuture applyTransaction(TransactionContext trx) { .setTerm(trx.getLogEntry().getTerm()) .setLogIndex(index); - // ApplyTransaction call can come with an entryIndex much greater than - // lastIndex updated because in between entries in the raft log can be - // appended because raft config persistence. Just add a dummy entry - // for those. - for (long i = lastIndex + 1; i < index; i++) { - LOG.info("Gap in indexes at:{} detected, adding dummy entries ", i); - applyTransactionCompletionMap.put(i, trx.getLogEntry().getTerm()); - } try { metrics.incNumApplyTransactionsOps(); ContainerCommandRequestProto requestProto = @@ -613,7 +613,6 @@ public CompletableFuture applyTransaction(TransactionContext trx) { .supplyAsync(() -> runCommand(requestProto, builder.build()), getCommandExecutor(requestProto)); - lastIndex = index; future.thenAccept(m -> { final Long previous = applyTransactionCompletionMap diff --git a/hadoop-hdds/pom.xml b/hadoop-hdds/pom.xml index e7cced800b..6f95547602 100644 --- a/hadoop-hdds/pom.xml +++ b/hadoop-hdds/pom.xml @@ -46,7 +46,7 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd"> 0.4.0-SNAPSHOT - 0.4.0-f283ffa-SNAPSHOT + 0.4.0-5680cf5-SNAPSHOT 1.60 diff --git a/hadoop-ozone/pom.xml b/hadoop-ozone/pom.xml index 9884215fd7..77b36321ac 100644 --- a/hadoop-ozone/pom.xml +++ b/hadoop-ozone/pom.xml @@ -29,7 +29,7 @@ 3.2.0 0.4.0-SNAPSHOT 0.4.0-SNAPSHOT - 0.4.0-f283ffa-SNAPSHOT + 0.4.0-5680cf5-SNAPSHOT 1.60 Badlands ${ozone.version}