diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
index cedcc43cbb..b748d69d5f 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
@@ -74,7 +74,7 @@ public final class ScmConfigKeys {
public static final String DFS_CONTAINER_RATIS_SEGMENT_SIZE_KEY =
"dfs.container.ratis.segment.size";
public static final int DFS_CONTAINER_RATIS_SEGMENT_SIZE_DEFAULT =
- 1 * 1024 * 1024 * 1024;
+ 16 * 1024;
public static final String DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_KEY =
"dfs.container.ratis.segment.preallocated.size";
public static final int
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 54bffd5723..e94e7e14eb 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -175,10 +175,10 @@
dfs.container.ratis.segment.size
- 1073741824
+ 16384
OZONE, RATIS, PERFORMANCE
The size of the raft segment used by Apache Ratis on datanodes.
- (1 GB by default)
+ (16 KB by default)
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index 3899bdec78..a3b496a7e8 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -120,7 +120,8 @@ public class ContainerStateMachine extends BaseStateMachine {
createContainerFutureMap;
private ExecutorService[] executors;
private final int numExecutors;
- private final Map containerCommandCompletionMap;
+ private final Map applyTransactionCompletionMap;
+ private long lastIndex;
/**
* CSM metrics.
*/
@@ -138,7 +139,8 @@ public ContainerStateMachine(RaftGroupId gid, ContainerDispatcher dispatcher,
this.executors = executors.toArray(new ExecutorService[numExecutors]);
this.writeChunkFutureMap = new ConcurrentHashMap<>();
this.createContainerFutureMap = new ConcurrentHashMap<>();
- containerCommandCompletionMap = new ConcurrentHashMap<>();
+ applyTransactionCompletionMap = new ConcurrentHashMap<>();
+ this.lastIndex = RaftServerConstants.INVALID_LOG_INDEX;
}
@Override
@@ -162,10 +164,12 @@ public void initialize(
private long loadSnapshot(SingleFileSnapshotInfo snapshot) {
if (snapshot == null) {
- TermIndex empty = TermIndex.newTermIndex(0, 0);
+ TermIndex empty = TermIndex.newTermIndex(0,
+ RaftServerConstants.INVALID_LOG_INDEX);
LOG.info("The snapshot info is null." +
"Setting the last applied index to:" + empty);
setLastAppliedTermIndex(empty);
+ lastIndex = RaftServerConstants.INVALID_LOG_INDEX;
return RaftServerConstants.INVALID_LOG_INDEX;
}
@@ -174,6 +178,7 @@ private long loadSnapshot(SingleFileSnapshotInfo snapshot) {
snapshot.getFile().getPath().toFile());
LOG.info("Setting the last applied index to " + last);
setLastAppliedTermIndex(last);
+ lastIndex = last.getIndex();
return last.getIndex();
}
@@ -471,7 +476,7 @@ private void updateLastApplied() {
Long appliedTerm = null;
long appliedIndex = -1;
for(long i = getLastAppliedTermIndex().getIndex() + 1;; i++) {
- final Long removed = containerCommandCompletionMap.remove(i);
+ final Long removed = applyTransactionCompletionMap.remove(i);
if (removed == null) {
break;
}
@@ -479,7 +484,7 @@ private void updateLastApplied() {
appliedIndex = i;
}
if (appliedTerm != null) {
- updateLastAppliedTermIndex(appliedIndex, appliedTerm);
+ updateLastAppliedTermIndex(appliedTerm, appliedIndex);
}
}
@@ -489,6 +494,15 @@ private void updateLastApplied() {
@Override
public CompletableFuture applyTransaction(TransactionContext trx) {
long index = trx.getLogEntry().getIndex();
+
+ // ApplyTransaction call can come with an entryIndex much greater than
+ // lastIndex updated because in between entries in the raft log can be
+ // appended because raft config persistence. Just add a dummy entry
+ // for those.
+ for (long i = lastIndex + 1; i < index; i++) {
+ LOG.info("Gap in indexes at:{} detected, adding dummy entries ", i);
+ applyTransactionCompletionMap.put(i, trx.getLogEntry().getTerm());
+ }
try {
metrics.incNumApplyTransactionsOps();
ContainerCommandRequestProto requestProto =
@@ -553,9 +567,10 @@ public CompletableFuture applyTransaction(TransactionContext trx) {
});
}
+ lastIndex = index;
future.thenAccept(m -> {
final Long previous =
- containerCommandCompletionMap
+ applyTransactionCompletionMap
.put(index, trx.getLogEntry().getTerm());
Preconditions.checkState(previous == null);
updateLastApplied();