HDDS-1601. Implement updating lastAppliedIndex after buffer flush to OM DB. (#972)

This commit is contained in:
Bharat Viswanadham 2019-06-15 13:47:07 -07:00 committed by GitHub
parent 8370a0ae16
commit e70aeb4d7e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 81 additions and 5 deletions

View File

@ -64,18 +64,23 @@ public class OzoneManagerDoubleBuffer {
private final AtomicLong flushIterations = new AtomicLong(0);
private volatile boolean isRunning;
private final OzoneManagerRatisSnapshot ozoneManagerRatisSnapShot;
public OzoneManagerDoubleBuffer(OMMetadataManager omMetadataManager) {
public OzoneManagerDoubleBuffer(OMMetadataManager omMetadataManager,
OzoneManagerRatisSnapshot ozoneManagerRatisSnapShot) {
this.currentBuffer = new ConcurrentLinkedQueue<>();
this.readyBuffer = new ConcurrentLinkedQueue<>();
this.omMetadataManager = omMetadataManager;
this.ozoneManagerRatisSnapShot = ozoneManagerRatisSnapShot;
isRunning = true;
// Daemon thread which runs in back ground and flushes transactions to DB.
daemon = new Daemon(this::flushTransactions);
daemon.setName("OMDoubleBufferFlushThread");
daemon.start();
}
/**
@ -117,7 +122,13 @@ private void flushTransactions() {
readyBuffer.clear();
// cleanup cache.
cleanupCache(lastRatisTransactionIndex);
// TODO: update the last updated index in OzoneManagerStateMachine.
// TODO: Need to revisit this logic, once we have multiple
// executors for volume/bucket request handling. As for now
// transactions are serialized this should be fine.
// update the last updated index in OzoneManagerStateMachine.
ozoneManagerRatisSnapShot.updateLastAppliedIndex(
lastRatisTransactionIndex);
}
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();

View File

@ -0,0 +1,32 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.om.ratis;
/**
* Functional interface for OM RatisSnapshot.
*/
public interface OzoneManagerRatisSnapshot {
/**
* Update lastAppliedIndex with the specified value in OzoneManager
* StateMachine.
* @param lastAppliedIndex
*/
void updateLastAppliedIndex(long lastAppliedIndex);
}

View File

@ -75,7 +75,8 @@ public OzoneManagerStateMachine(OzoneManagerRatisServer ratisServer) {
this.omRatisServer = ratisServer;
this.ozoneManager = omRatisServer.getOzoneManager();
this.ozoneManagerDoubleBuffer =
new OzoneManagerDoubleBuffer(ozoneManager.getMetadataManager());
new OzoneManagerDoubleBuffer(ozoneManager.getMetadataManager(),
this::updateLastAppliedIndex);
this.handler = new OzoneManagerHARequestHandlerImpl(ozoneManager,
ozoneManagerDoubleBuffer);
}
@ -375,6 +376,11 @@ private Message runCommand(OMRequest request, long trxLogIndex) {
return OMRatisHelper.convertResponseToMessage(response);
}
@SuppressWarnings("HiddenField")
public void updateLastAppliedIndex(long lastAppliedIndex) {
this.lastAppliedIndex = lastAppliedIndex;
}
/**
* Submits read request to OM and returns the response Message.
* @param request OMRequest

View File

@ -55,6 +55,9 @@ public class TestOzoneManagerDoubleBufferWithDummyResponse {
private OMMetadataManager omMetadataManager;
private OzoneManagerDoubleBuffer doubleBuffer;
private AtomicLong trxId = new AtomicLong(0);
private OzoneManagerRatisSnapshot ozoneManagerRatisSnapshot;
private long lastAppliedIndex;
@Rule
public TemporaryFolder folder = new TemporaryFolder();
@ -66,7 +69,11 @@ public void setup() throws IOException {
folder.newFolder().getAbsolutePath());
omMetadataManager =
new OmMetadataManagerImpl(configuration);
doubleBuffer = new OzoneManagerDoubleBuffer(omMetadataManager);
ozoneManagerRatisSnapshot = index -> {
lastAppliedIndex = index;
};
doubleBuffer = new OzoneManagerDoubleBuffer(omMetadataManager,
ozoneManagerRatisSnapshot);
}
@After
@ -94,6 +101,9 @@ public void testDoubleBufferWithDummyResponse() throws Exception {
Assert.assertTrue(omMetadataManager.countRowsInTable(
omMetadataManager.getBucketTable()) == (bucketCount));
Assert.assertTrue(doubleBuffer.getFlushIterations() > 0);
// Check lastAppliedIndex is updated correctly or not.
Assert.assertEquals(bucketCount, lastAppliedIndex);
}
/**

View File

@ -65,6 +65,8 @@ public class TestOzoneManagerDoubleBufferWithOMResponse {
private OMMetadataManager omMetadataManager;
private OzoneManagerDoubleBuffer doubleBuffer;
private AtomicLong trxId = new AtomicLong(0);
private OzoneManagerRatisSnapshot ozoneManagerRatisSnapshot;
private long lastAppliedIndex;
@Rule
public TemporaryFolder folder = new TemporaryFolder();
@ -76,7 +78,11 @@ public void setup() throws IOException {
folder.newFolder().getAbsolutePath());
omMetadataManager =
new OmMetadataManagerImpl(configuration);
doubleBuffer = new OzoneManagerDoubleBuffer(omMetadataManager);
ozoneManagerRatisSnapshot = index -> {
lastAppliedIndex = index;
};
doubleBuffer = new OzoneManagerDoubleBuffer(omMetadataManager,
ozoneManagerRatisSnapshot);
}
@After
@ -146,6 +152,9 @@ public void testDoubleBufferWithMixOfTransactions() throws Exception {
checkCreateBuckets(bucketQueue);
checkDeletedBuckets(deleteBucketQueue);
// Check lastAppliedIndex is updated correctly or not.
Assert.assertEquals(bucketCount + deleteCount + 1, lastAppliedIndex);
}
/**
@ -208,6 +217,9 @@ public void testDoubleBufferWithMixOfTransactionsParallel() throws Exception {
checkCreateBuckets(bucketQueue);
checkDeletedBuckets(deleteBucketQueue);
// Check lastAppliedIndex is updated correctly or not.
Assert.assertEquals(bucketCount + deleteCount + 2, lastAppliedIndex);
}
/**
@ -321,6 +333,8 @@ private void checkDeletedBuckets(Queue<OMBucketDeleteResponse>
public void testDoubleBuffer(int iterations, int bucketCount)
throws Exception {
try {
// Reset transaction id.
trxId.set(0);
// Calling setup and stop here because this method is called from a
// single test multiple times.
setup();
@ -343,6 +357,9 @@ public void testDoubleBuffer(int iterations, int bucketCount)
omMetadataManager.getBucketTable()) == (bucketCount) * iterations);
Assert.assertTrue(doubleBuffer.getFlushIterations() > 0);
// Check lastAppliedIndex is updated correctly or not.
Assert.assertEquals((bucketCount + 1) * iterations, lastAppliedIndex);
} finally {
stop();
}