diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java index 810311583c..8c25347449 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java @@ -64,18 +64,23 @@ public class OzoneManagerDoubleBuffer { private final AtomicLong flushIterations = new AtomicLong(0); private volatile boolean isRunning; + private final OzoneManagerRatisSnapshot ozoneManagerRatisSnapShot; - public OzoneManagerDoubleBuffer(OMMetadataManager omMetadataManager) { + public OzoneManagerDoubleBuffer(OMMetadataManager omMetadataManager, + OzoneManagerRatisSnapshot ozoneManagerRatisSnapShot) { this.currentBuffer = new ConcurrentLinkedQueue<>(); this.readyBuffer = new ConcurrentLinkedQueue<>(); this.omMetadataManager = omMetadataManager; + this.ozoneManagerRatisSnapShot = ozoneManagerRatisSnapShot; + isRunning = true; // Daemon thread which runs in back ground and flushes transactions to DB. daemon = new Daemon(this::flushTransactions); daemon.setName("OMDoubleBufferFlushThread"); daemon.start(); + } /** @@ -117,7 +122,13 @@ private void flushTransactions() { readyBuffer.clear(); // cleanup cache. cleanupCache(lastRatisTransactionIndex); - // TODO: update the last updated index in OzoneManagerStateMachine. + + // TODO: Need to revisit this logic, once we have multiple + // executors for volume/bucket request handling. As for now + // transactions are serialized this should be fine. + // update the last updated index in OzoneManagerStateMachine. + ozoneManagerRatisSnapShot.updateLastAppliedIndex( + lastRatisTransactionIndex); } } catch (InterruptedException ex) { Thread.currentThread().interrupt(); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisSnapshot.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisSnapshot.java new file mode 100644 index 0000000000..518026184a --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisSnapshot.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *
+ * http://www.apache.org/licenses/LICENSE-2.0 + *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om.ratis;
+
+/**
+ * Functional interface for OM RatisSnapshot.
+ */
+
+public interface OzoneManagerRatisSnapshot {
+
+ /**
+ * Update lastAppliedIndex with the specified value in OzoneManager
+ * StateMachine.
+ * @param lastAppliedIndex
+ */
+ void updateLastAppliedIndex(long lastAppliedIndex);
+}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
index 2577cb5dbe..718967a905 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
@@ -75,7 +75,8 @@ public OzoneManagerStateMachine(OzoneManagerRatisServer ratisServer) {
this.omRatisServer = ratisServer;
this.ozoneManager = omRatisServer.getOzoneManager();
this.ozoneManagerDoubleBuffer =
- new OzoneManagerDoubleBuffer(ozoneManager.getMetadataManager());
+ new OzoneManagerDoubleBuffer(ozoneManager.getMetadataManager(),
+ this::updateLastAppliedIndex);
this.handler = new OzoneManagerHARequestHandlerImpl(ozoneManager,
ozoneManagerDoubleBuffer);
}
@@ -375,6 +376,11 @@ private Message runCommand(OMRequest request, long trxLogIndex) {
return OMRatisHelper.convertResponseToMessage(response);
}
+ @SuppressWarnings("HiddenField")
+ public void updateLastAppliedIndex(long lastAppliedIndex) {
+ this.lastAppliedIndex = lastAppliedIndex;
+ }
+
/**
* Submits read request to OM and returns the response Message.
* @param request OMRequest
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithDummyResponse.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithDummyResponse.java
index c616a28d43..116595500c 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithDummyResponse.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithDummyResponse.java
@@ -55,6 +55,9 @@ public class TestOzoneManagerDoubleBufferWithDummyResponse {
private OMMetadataManager omMetadataManager;
private OzoneManagerDoubleBuffer doubleBuffer;
private AtomicLong trxId = new AtomicLong(0);
+ private OzoneManagerRatisSnapshot ozoneManagerRatisSnapshot;
+ private long lastAppliedIndex;
+
@Rule
public TemporaryFolder folder = new TemporaryFolder();
@@ -66,7 +69,11 @@ public void setup() throws IOException {
folder.newFolder().getAbsolutePath());
omMetadataManager =
new OmMetadataManagerImpl(configuration);
- doubleBuffer = new OzoneManagerDoubleBuffer(omMetadataManager);
+ ozoneManagerRatisSnapshot = index -> {
+ lastAppliedIndex = index;
+ };
+ doubleBuffer = new OzoneManagerDoubleBuffer(omMetadataManager,
+ ozoneManagerRatisSnapshot);
}
@After
@@ -94,6 +101,9 @@ public void testDoubleBufferWithDummyResponse() throws Exception {
Assert.assertTrue(omMetadataManager.countRowsInTable(
omMetadataManager.getBucketTable()) == (bucketCount));
Assert.assertTrue(doubleBuffer.getFlushIterations() > 0);
+
+ // Check lastAppliedIndex is updated correctly or not.
+ Assert.assertEquals(bucketCount, lastAppliedIndex);
}
/**
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithOMResponse.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithOMResponse.java
index 3b544449ef..6a0bcb6a05 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithOMResponse.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithOMResponse.java
@@ -65,6 +65,8 @@ public class TestOzoneManagerDoubleBufferWithOMResponse {
private OMMetadataManager omMetadataManager;
private OzoneManagerDoubleBuffer doubleBuffer;
private AtomicLong trxId = new AtomicLong(0);
+ private OzoneManagerRatisSnapshot ozoneManagerRatisSnapshot;
+ private long lastAppliedIndex;
@Rule
public TemporaryFolder folder = new TemporaryFolder();
@@ -76,7 +78,11 @@ public void setup() throws IOException {
folder.newFolder().getAbsolutePath());
omMetadataManager =
new OmMetadataManagerImpl(configuration);
- doubleBuffer = new OzoneManagerDoubleBuffer(omMetadataManager);
+ ozoneManagerRatisSnapshot = index -> {
+ lastAppliedIndex = index;
+ };
+ doubleBuffer = new OzoneManagerDoubleBuffer(omMetadataManager,
+ ozoneManagerRatisSnapshot);
}
@After
@@ -146,6 +152,9 @@ public void testDoubleBufferWithMixOfTransactions() throws Exception {
checkCreateBuckets(bucketQueue);
checkDeletedBuckets(deleteBucketQueue);
+
+ // Check lastAppliedIndex is updated correctly or not.
+ Assert.assertEquals(bucketCount + deleteCount + 1, lastAppliedIndex);
}
/**
@@ -208,6 +217,9 @@ public void testDoubleBufferWithMixOfTransactionsParallel() throws Exception {
checkCreateBuckets(bucketQueue);
checkDeletedBuckets(deleteBucketQueue);
+
+ // Check lastAppliedIndex is updated correctly or not.
+ Assert.assertEquals(bucketCount + deleteCount + 2, lastAppliedIndex);
}
/**
@@ -321,6 +333,8 @@ private void checkDeletedBuckets(Queue