HDDS-1827. Load Snapshot info when OM Ratis server starts. (#1130)

This commit is contained in:
Hanisha Koneru 2019-08-23 13:19:13 -07:00 committed by GitHub
parent ebef99dcf4
commit 3f887f3b92
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 447 additions and 56 deletions

View File

@ -29,10 +29,9 @@ public interface OzoneManagerHAProtocol {
/**
* Store the snapshot index i.e. the raft log index, corresponding to the
* last transaction applied to the OM RocksDB, in OM metadata dir on disk.
* @param flush flush the OM DB to disk if true
* @return the snapshot index
* @throws IOException
*/
long saveRatisSnapshot(boolean flush) throws IOException;
long saveRatisSnapshot() throws IOException;
}

View File

@ -236,19 +236,20 @@ private Map<String, OzoneManager> createOMService() throws IOException,
for (int i = 1; i<= numOfOMs; i++) {
// Set nodeId
String nodeId = nodeIdBaseStr + i;
conf.set(OMConfigKeys.OZONE_OM_NODE_ID_KEY, nodeId);
OzoneConfiguration config = new OzoneConfiguration(conf);
config.set(OMConfigKeys.OZONE_OM_NODE_ID_KEY, nodeId);
// Set the OM http(s) address to null so that the cluster picks
// up the address set with service ID and node ID in initHAConfig
conf.set(OMConfigKeys.OZONE_OM_HTTP_ADDRESS_KEY, "");
conf.set(OMConfigKeys.OZONE_OM_HTTPS_ADDRESS_KEY, "");
config.set(OMConfigKeys.OZONE_OM_HTTP_ADDRESS_KEY, "");
config.set(OMConfigKeys.OZONE_OM_HTTPS_ADDRESS_KEY, "");
// Set metadata/DB dir base path
String metaDirPath = path + "/" + nodeId;
conf.set(OZONE_METADATA_DIRS, metaDirPath);
OMStorage omStore = new OMStorage(conf);
config.set(OZONE_METADATA_DIRS, metaDirPath);
OMStorage omStore = new OMStorage(config);
initializeOmStorage(omStore);
OzoneManager om = OzoneManager.createOm(conf);
OzoneManager om = OzoneManager.createOm(config);
om.setCertClient(certClient);
omMap.put(nodeId, om);

View File

@ -133,7 +133,6 @@ public void testInstallSnapshot() throws Exception {
long leaderOMappliedLogIndex =
leaderRatisServer.getStateMachineLastAppliedIndex();
leaderOM.getOmRatisServer().getStateMachineLastAppliedIndex();
List<String> keys = new ArrayList<>();
while (leaderOMappliedLogIndex < 2000) {
@ -143,7 +142,7 @@ public void testInstallSnapshot() throws Exception {
}
// Get the latest db checkpoint from the leader OM.
long leaderOMSnaphsotIndex = leaderOM.saveRatisSnapshot(true);
long leaderOMSnaphsotIndex = leaderOM.saveRatisSnapshot();
DBCheckpoint leaderDbCheckpoint =
leaderOM.getMetadataManager().getStore().getCheckpoint(false);

View File

@ -19,6 +19,7 @@
import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collections;
import java.util.HashMap;
@ -100,6 +101,7 @@ public class TestOzoneManagerHA {
private String scmId;
private int numOfOMs = 3;
private static final long SNAPSHOT_THRESHOLD = 50;
private static final int LOG_PURGE_GAP = 50;
@Rule
public ExpectedException exception = ExpectedException.none();
@ -126,6 +128,7 @@ public void init() throws Exception {
conf.setLong(
OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD_KEY,
SNAPSHOT_THRESHOLD);
conf.setInt(OMConfigKeys.OZONE_OM_RATIS_LOG_PURGE_GAP, LOG_PURGE_GAP);
cluster = (MiniOzoneHAClusterImpl) MiniOzoneCluster.newHABuilder(conf)
.setClusterId(clusterId)
.setScmId(scmId)
@ -1087,7 +1090,7 @@ public void testOMRatisSnapshot() throws Exception {
}
GenericTestUtils.waitFor(() -> {
if (ozoneManager.loadRatisSnapshotIndex() > 0) {
if (ozoneManager.getRatisSnapshotIndex() > 0) {
return true;
}
return false;
@ -1097,7 +1100,7 @@ public void testOMRatisSnapshot() throws Exception {
// than or equal to the saved snapshot index.
long smLastAppliedIndex =
ozoneManager.getOmRatisServer().getStateMachineLastAppliedIndex();
long ratisSnapshotIndex = ozoneManager.loadRatisSnapshotIndex();
long ratisSnapshotIndex = ozoneManager.getRatisSnapshotIndex();
Assert.assertTrue("LastAppliedIndex on OM State Machine ("
+ smLastAppliedIndex + ") is less than the saved snapshot index("
+ ratisSnapshotIndex + ").",
@ -1111,14 +1114,14 @@ public void testOMRatisSnapshot() throws Exception {
}
GenericTestUtils.waitFor(() -> {
if (ozoneManager.loadRatisSnapshotIndex() > 0) {
if (ozoneManager.getRatisSnapshotIndex() > 0) {
return true;
}
return false;
}, 1000, 100000);
// The new snapshot index must be greater than the previous snapshot index
long ratisSnapshotIndexNew = ozoneManager.loadRatisSnapshotIndex();
long ratisSnapshotIndexNew = ozoneManager.getRatisSnapshotIndex();
Assert.assertTrue("Latest snapshot index must be greater than previous " +
"snapshot indices", ratisSnapshotIndexNew > ratisSnapshotIndex);
@ -1138,4 +1141,101 @@ static String createKey(OzoneBucket ozoneBucket) throws IOException {
ozoneOutputStream.close();
return keyName;
}
@Test
public void testOMRestart() throws Exception {
// Get the leader OM
String leaderOMNodeId = objectStore.getClientProxy().getOMProxyProvider()
.getCurrentProxyOMNodeId();
OzoneManager leaderOM = cluster.getOzoneManager(leaderOMNodeId);
// Get follower OMs
OzoneManager followerOM1 = cluster.getOzoneManager(
leaderOM.getPeerNodes().get(0).getOMNodeId());
OzoneManager followerOM2 = cluster.getOzoneManager(
leaderOM.getPeerNodes().get(1).getOMNodeId());
// Do some transactions so that the log index increases
String userName = "user" + RandomStringUtils.randomNumeric(5);
String adminName = "admin" + RandomStringUtils.randomNumeric(5);
String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
VolumeArgs createVolumeArgs = VolumeArgs.newBuilder()
.setOwner(userName)
.setAdmin(adminName)
.build();
objectStore.createVolume(volumeName, createVolumeArgs);
OzoneVolume retVolumeinfo = objectStore.getVolume(volumeName);
retVolumeinfo.createBucket(bucketName);
OzoneBucket ozoneBucket = retVolumeinfo.getBucket(bucketName);
for (int i = 0; i < 10; i++) {
createKey(ozoneBucket);
}
long lastAppliedTxOnFollowerOM =
followerOM1.getOmRatisServer().getStateMachineLastAppliedIndex();
// Stop one follower OM
followerOM1.stop();
// Do more transactions. Stopped OM should miss these transactions and
// the logs corresponding to atleast some of the missed transactions
// should be purged. This will force the OM to install snapshot when
// restarted.
long minNewTxIndex = lastAppliedTxOnFollowerOM + (LOG_PURGE_GAP * 10);
long leaderOMappliedLogIndex = leaderOM.getOmRatisServer()
.getStateMachineLastAppliedIndex();
List<String> missedKeys = new ArrayList<>();
while (leaderOMappliedLogIndex < minNewTxIndex) {
missedKeys.add(createKey(ozoneBucket));
leaderOMappliedLogIndex = leaderOM.getOmRatisServer()
.getStateMachineLastAppliedIndex();
}
// Restart the stopped OM.
followerOM1.restart();
// Get the latest snapshotIndex from the leader OM.
long leaderOMSnaphsotIndex = leaderOM.saveRatisSnapshot();
// The recently started OM should be lagging behind the leader OM.
long followerOMLastAppliedIndex =
followerOM1.getOmRatisServer().getStateMachineLastAppliedIndex();
Assert.assertTrue(
followerOMLastAppliedIndex < leaderOMSnaphsotIndex);
// Wait for the follower OM to catch up
GenericTestUtils.waitFor(() -> {
long lastAppliedIndex =
followerOM1.getOmRatisServer().getStateMachineLastAppliedIndex();
if (lastAppliedIndex >= leaderOMSnaphsotIndex) {
return true;
}
return false;
}, 100, 200000);
// Do more transactions. The restarted OM should receive the
// new transactions. It's last applied tx index should increase from the
// last snapshot index after more transactions are applied.
for (int i = 0; i < 10; i++) {
createKey(ozoneBucket);
}
long followerOM1lastAppliedIndex = followerOM1.getOmRatisServer()
.getStateMachineLastAppliedIndex();
Assert.assertTrue(followerOM1lastAppliedIndex >
leaderOMSnaphsotIndex);
// The follower OMs should be in sync. There can be a small lag between
// leader OM and follower OMs as txns are applied first on leader OM.
long followerOM2lastAppliedIndex = followerOM1.getOmRatisServer()
.getStateMachineLastAppliedIndex();
Assert.assertEquals(followerOM1lastAppliedIndex,
followerOM2lastAppliedIndex);
}
}

View File

@ -0,0 +1,65 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.om.snapshot;
import org.apache.hadoop.ozone.om.ratis.OMRatisSnapshotInfo;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.util.Random;
/**
* Tests {@link org.apache.hadoop.ozone.om.ratis.OMRatisSnapshotInfo}.
*/
public class TestOMRatisSnapshotInfo {
@Rule
public TemporaryFolder folder = new TemporaryFolder();
@Test
public void testSaveAndLoadSnapshotInfo() throws Exception {
File rootDir = folder.newFolder();
OMRatisSnapshotInfo omRatisSnapshotInfo = new OMRatisSnapshotInfo(rootDir);
// Initially term and index should be 0 and -1
Assert.assertEquals(0, omRatisSnapshotInfo.getTerm());
Assert.assertEquals(-1, omRatisSnapshotInfo.getIndex());
Random random = new Random();
int snapshotIndex = random.nextInt(50);
int termIndex = random.nextInt(10);
// Save snapshotInfo to disk
omRatisSnapshotInfo.updateTerm(termIndex);
omRatisSnapshotInfo.saveRatisSnapshotToDisk(snapshotIndex);
Assert.assertEquals(termIndex, omRatisSnapshotInfo.getTerm());
Assert.assertEquals(snapshotIndex, omRatisSnapshotInfo.getIndex());
// Load the snapshot file into new SnapshotInfo
OMRatisSnapshotInfo newSnapshotInfo = new OMRatisSnapshotInfo(rootDir);
// Verify that the snapshot file loaded properly
Assert.assertEquals(termIndex, newSnapshotInfo.getTerm());
Assert.assertEquals(snapshotIndex, newSnapshotInfo.getIndex());
}
}

View File

@ -114,7 +114,7 @@ public void testDownloadCheckpoint() throws Exception {
DBCheckpoint omSnapshot = followerOM.getOmSnapshotProvider()
.getOzoneManagerDBSnapshot(leaderOMNodeId);
long leaderSnapshotIndex = ozoneManager.loadRatisSnapshotIndex();
long leaderSnapshotIndex = ozoneManager.getRatisSnapshotIndex();
long downloadedSnapshotIndex = omSnapshot.getRatisSnapshotIndex();
// The snapshot index downloaded from leader OM should match the ratis

View File

@ -126,9 +126,9 @@ public void doGet(HttpServletRequest request, HttpServletResponse response) {
// ratis snapshot first. This step also included flushing the OM DB.
// Hence, we can set flush to false.
flush = false;
ratisSnapshotIndex = om.saveRatisSnapshot(true);
ratisSnapshotIndex = om.saveRatisSnapshot();
} else {
ratisSnapshotIndex = om.loadRatisSnapshotIndex();
ratisSnapshotIndex = om.getRatisSnapshotIndex();
}
DBCheckpoint checkpoint = omDbStore.getCheckpoint(flush);

View File

@ -67,7 +67,6 @@
import org.apache.hadoop.hdds.server.ServiceRuntimeInfoImpl;
import org.apache.hadoop.hdds.tracing.TracingUtil;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.util.PersistentLongFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.ipc.Client;
@ -81,6 +80,7 @@
import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
import org.apache.hadoop.ozone.om.protocol.OzoneManagerServerProtocol;
import org.apache.hadoop.ozone.om.ratis.OMRatisSnapshotInfo;
import org.apache.hadoop.ozone.om.snapshot.OzoneManagerSnapshotProvider;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DBUpdatesRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
@ -196,7 +196,6 @@
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConsts.OM_METRICS_FILE;
import static org.apache.hadoop.ozone.OzoneConsts.OM_METRICS_TEMP_FILE;
import static org.apache.hadoop.ozone.OzoneConsts.OM_RATIS_SNAPSHOT_INDEX;
import static org.apache.hadoop.ozone.OzoneConsts.RPC_PORT;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_HANDLER_COUNT_DEFAULT;
@ -284,8 +283,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
private OMNodeDetails omNodeDetails;
private List<OMNodeDetails> peerNodes;
private File omRatisSnapshotDir;
private final File ratisSnapshotFile;
private long snapshotIndex;
private final OMRatisSnapshotInfo omRatisSnapshotInfo;
private final Collection<String> ozAdmins;
private KeyProviderCryptoExtension kmsProvider = null;
@ -388,6 +386,9 @@ private OzoneManager(OzoneConfiguration conf) throws IOException,
instantiateServices();
this.omRatisSnapshotInfo = new OMRatisSnapshotInfo(
omStorage.getCurrentDir());
initializeRatisServer();
initializeRatisClient();
@ -409,10 +410,6 @@ private OzoneManager(OzoneConfiguration conf) throws IOException,
}
}
this.ratisSnapshotFile = new File(omStorage.getCurrentDir(),
OM_RATIS_SNAPSHOT_INDEX);
this.snapshotIndex = loadRatisSnapshotIndex();
metrics = OMMetrics.create();
// Start Om Rpc Server.
@ -1313,7 +1310,8 @@ public void restart() throws IOException {
HddsUtils.initializeMetrics(configuration, "OzoneManager");
metadataManager.start(configuration);
instantiateServices();
startSecretManagerIfNecessary();
// Set metrics and start metrics back ground thread
@ -1334,7 +1332,6 @@ public void restart() throws IOException {
metricsTimer = new Timer();
metricsTimer.schedule(scheduleOMMetricsWriteTask, 0, period);
keyManager.start(configuration);
omRpcServer = getRpcServer(configuration);
omRpcServer.start();
isOmRpcServerRunning = true;
@ -1420,31 +1417,23 @@ private void initializeRatisClient() throws IOException {
}
}
public OMRatisSnapshotInfo getSnapshotInfo() {
return omRatisSnapshotInfo;
}
@VisibleForTesting
public long loadRatisSnapshotIndex() {
if (ratisSnapshotFile.exists()) {
try {
return PersistentLongFile.readFile(ratisSnapshotFile, 0);
} catch (IOException e) {
LOG.error("Unable to read the ratis snapshot index (last applied " +
"transaction log index)", e);
}
}
return 0;
public long getRatisSnapshotIndex() {
return omRatisSnapshotInfo.getIndex();
}
@Override
public long saveRatisSnapshot(boolean flush) throws IOException {
snapshotIndex = omRatisServer.getStateMachineLastAppliedIndex();
public long saveRatisSnapshot() throws IOException {
long snapshotIndex = omRatisServer.getStateMachineLastAppliedIndex();
if (flush) {
// Flush the OM state to disk
metadataManager.getStore().flush();
}
PersistentLongFile.writeFile(ratisSnapshotFile, snapshotIndex);
LOG.info("Saved Ratis Snapshot on the OM with snapshotIndex {}",
snapshotIndex);
omRatisSnapshotInfo.saveRatisSnapshotToDisk(snapshotIndex);
return snapshotIndex;
}
@ -1468,9 +1457,11 @@ public void stop() {
}
if (omRatisServer != null) {
omRatisServer.stop();
omRatisServer = null;
}
if (omRatisClient != null) {
omRatisClient.close();
omRatisClient = null;
}
isOmRpcServerRunning = false;
keyManager.stop();
@ -3349,8 +3340,7 @@ void reloadOMState(long newSnapshotIndex) throws IOException {
// Update OM snapshot index with the new snapshot index (from the new OM
// DB state) and save the snapshot index to disk
this.snapshotIndex = newSnapshotIndex;
saveRatisSnapshot(false);
omRatisSnapshotInfo.saveRatisSnapshotToDisk(newSnapshotIndex);
}
public static Logger getLogger() {

View File

@ -0,0 +1,180 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.om.ratis;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.storage.FileInfo;
import org.apache.ratis.statemachine.SnapshotInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.yaml.snakeyaml.DumperOptions;
import org.yaml.snakeyaml.Yaml;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.util.List;
import static org.apache.hadoop.ozone.OzoneConsts.OM_RATIS_SNAPSHOT_INDEX;
/**
* This class captures the snapshotIndex and term of the latest snapshot in
* the OM.
* Ratis server loads the snapshotInfo during startup and updates the
* lastApplied index to this snapshotIndex. OM SnapshotInfo does not contain
* any files. It is used only to store/ update the last applied index and term.
*/
public class OMRatisSnapshotInfo implements SnapshotInfo {
static final Logger LOG = LoggerFactory.getLogger(OMRatisSnapshotInfo.class);
private volatile long term = 0;
private volatile long snapshotIndex = -1;
private final File ratisSnapshotFile;
public OMRatisSnapshotInfo(File ratisDir) throws IOException {
ratisSnapshotFile = new File(ratisDir, OM_RATIS_SNAPSHOT_INDEX);
loadRatisSnapshotIndex();
}
public void updateTerm(long newTerm) {
term = newTerm;
}
private void updateSnapshotIndex(long newSnapshotIndex) {
snapshotIndex = newSnapshotIndex;
}
private void updateTermIndex(long newTerm, long newIndex) {
this.term = newTerm;
this.snapshotIndex = newIndex;
}
/**
* Load the snapshot index and term from the snapshot file on disk,
* if it exists.
* @throws IOException
*/
private void loadRatisSnapshotIndex() throws IOException {
if (ratisSnapshotFile.exists()) {
RatisSnapshotYaml ratisSnapshotYaml = readRatisSnapshotYaml();
updateTermIndex(ratisSnapshotYaml.term, ratisSnapshotYaml.snapshotIndex);
}
}
/**
* Read and parse the snapshot yaml file.
*/
private RatisSnapshotYaml readRatisSnapshotYaml() throws IOException {
try (FileInputStream inputFileStream = new FileInputStream(
ratisSnapshotFile)) {
Yaml yaml = new Yaml();
try {
return yaml.loadAs(inputFileStream, RatisSnapshotYaml.class);
} catch (Exception e) {
throw new IOException("Unable to parse RatisSnapshot yaml file.", e);
}
}
}
/**
* Update and persist the snapshot index and term to disk.
* @param index new snapshot index to be persisted to disk.
* @throws IOException
*/
public void saveRatisSnapshotToDisk(long index) throws IOException {
updateSnapshotIndex(index);
writeRatisSnapshotYaml();
LOG.info("Saved Ratis Snapshot on the OM with snapshotIndex {}", index);
}
/**
* Write snapshot details to disk in yaml format.
*/
private void writeRatisSnapshotYaml() throws IOException {
DumperOptions options = new DumperOptions();
options.setPrettyFlow(true);
options.setDefaultFlowStyle(DumperOptions.FlowStyle.FLOW);
Yaml yaml = new Yaml(options);
RatisSnapshotYaml ratisSnapshotYaml = new RatisSnapshotYaml(term,
snapshotIndex);
try (Writer writer = new OutputStreamWriter(
new FileOutputStream(ratisSnapshotFile), "UTF-8")) {
yaml.dump(ratisSnapshotYaml, writer);
}
}
@Override
public TermIndex getTermIndex() {
return TermIndex.newTermIndex(term, snapshotIndex);
}
@Override
public long getTerm() {
return term;
}
@Override
public long getIndex() {
return snapshotIndex;
}
@Override
public List<FileInfo> getFiles() {
return null;
}
/**
* Ratis Snapshot details to be written to the yaml file.
*/
public static class RatisSnapshotYaml {
private long term;
private long snapshotIndex;
public RatisSnapshotYaml() {
// Needed for snake-yaml introspection.
}
RatisSnapshotYaml(long term, long snapshotIndex) {
this.term = term;
this.snapshotIndex = snapshotIndex;
}
public void setTerm(long term) {
this.term = term;
}
public long getTerm() {
return this.term;
}
public void setSnapshotIndex(long index) {
this.snapshotIndex = index;
}
public long getSnapshotIndex() {
return this.snapshotIndex;
}
}
}

View File

@ -28,8 +28,6 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.ozone.container.common.transport.server.ratis
.ContainerStateMachine;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
@ -48,6 +46,7 @@
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.statemachine.SnapshotInfo;
import org.apache.ratis.statemachine.TransactionContext;
import org.apache.ratis.statemachine.impl.BaseStateMachine;
import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
@ -64,26 +63,33 @@
public class OzoneManagerStateMachine extends BaseStateMachine {
static final Logger LOG =
LoggerFactory.getLogger(ContainerStateMachine.class);
LoggerFactory.getLogger(OzoneManagerStateMachine.class);
private final SimpleStateMachineStorage storage =
new SimpleStateMachineStorage();
private final OzoneManagerRatisServer omRatisServer;
private final OzoneManager ozoneManager;
private OzoneManagerHARequestHandler handler;
private RaftGroupId raftGroupId;
private long lastAppliedIndex = 0;
private long lastAppliedIndex;
private OzoneManagerDoubleBuffer ozoneManagerDoubleBuffer;
private final OMRatisSnapshotInfo snapshotInfo;
private final ExecutorService executorService;
private final ExecutorService installSnapshotExecutor;
public OzoneManagerStateMachine(OzoneManagerRatisServer ratisServer) {
this.omRatisServer = ratisServer;
this.ozoneManager = omRatisServer.getOzoneManager();
this.snapshotInfo = ozoneManager.getSnapshotInfo();
updateLastAppliedIndexWithSnaphsotIndex();
this.ozoneManagerDoubleBuffer =
new OzoneManagerDoubleBuffer(ozoneManager.getMetadataManager(),
this::updateLastAppliedIndex);
this.handler = new OzoneManagerHARequestHandlerImpl(ozoneManager,
ozoneManagerDoubleBuffer);
ThreadFactory build = new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("OM StateMachine ApplyTransaction Thread - %d").build();
this.executorService = HadoopExecutors.newSingleThreadExecutor(build);
@ -103,6 +109,29 @@ public void initialize(RaftServer server, RaftGroupId id,
});
}
@Override
public SnapshotInfo getLatestSnapshot() {
return snapshotInfo;
}
/**
* Called to notify state machine about indexes which are processed
* internally by Raft Server, this currently happens when conf entries are
* processed in raft Server. This keep state machine to keep a track of index
* updates.
* @param term term of the current log entry
* @param index index which is being updated
*/
@Override
public void notifyIndexUpdate(long term, long index) {
// SnapshotInfo should be updated when the term changes.
// The index here refers to the log entry index and the index in
// SnapshotInfo represents the snapshotIndex i.e. the index of the last
// transaction included in the snapshot. Hence, snaphsotInfo#index is not
// updated here.
snapshotInfo.updateTerm(term);
}
/**
* Validate/pre-process the incoming update request in the state machine.
* @return the content to be written to the log entry. Null means the request
@ -224,7 +253,7 @@ public void unpause(long newLastAppliedSnaphsotIndex) {
public long takeSnapshot() throws IOException {
LOG.info("Saving Ratis snapshot on the OM.");
if (ozoneManager != null) {
return ozoneManager.saveRatisSnapshot(true);
return ozoneManager.saveRatisSnapshot();
}
return 0;
}
@ -305,6 +334,10 @@ public void updateLastAppliedIndex(long lastAppliedIndex) {
this.lastAppliedIndex = lastAppliedIndex;
}
public void updateLastAppliedIndexWithSnaphsotIndex() {
this.lastAppliedIndex = snapshotInfo.getIndex();
}
/**
* Submits read request to OM and returns the response Message.
* @param request OMRequest

View File

@ -70,6 +70,7 @@ public class TestOzoneManagerRatisServer {
private static final long LEADER_ELECTION_TIMEOUT = 500L;
private OMMetadataManager omMetadataManager;
private OzoneManager ozoneManager;
private OMNodeDetails omNodeDetails;
@Before
public void init() throws Exception {
@ -86,7 +87,7 @@ public void init() throws Exception {
OMConfigKeys.OZONE_OM_RATIS_PORT_DEFAULT);
InetSocketAddress rpcAddress = new InetSocketAddress(
InetAddress.getLocalHost(), 0);
OMNodeDetails omNodeDetails = new OMNodeDetails.Builder()
omNodeDetails = new OMNodeDetails.Builder()
.setRpcAddress(rpcAddress)
.setRatisPort(ratisPort)
.setOMNodeId(omID)
@ -99,6 +100,9 @@ public void init() throws Exception {
folder.newFolder().getAbsolutePath());
omMetadataManager = new OmMetadataManagerImpl(ozoneConfiguration);
when(ozoneManager.getMetadataManager()).thenReturn(omMetadataManager);
OMRatisSnapshotInfo omRatisSnapshotInfo = new OMRatisSnapshotInfo(
folder.newFolder());
when(ozoneManager.getSnapshotInfo()).thenReturn(omRatisSnapshotInfo);
omRatisServer = OzoneManagerRatisServer.newOMRatisServer(conf, ozoneManager,
omNodeDetails, Collections.emptyList());
omRatisServer.start();
@ -126,6 +130,24 @@ public void testStartOMRatisServer() throws Exception {
LifeCycle.State.RUNNING, omRatisServer.getServerState());
}
@Test
public void testLoadSnapshotInfoOnStart() throws Exception {
// Stop the Ratis server and manually update the snapshotInfo.
long oldSnaphsotIndex = ozoneManager.saveRatisSnapshot();
ozoneManager.getSnapshotInfo().saveRatisSnapshotToDisk(oldSnaphsotIndex);
omRatisServer.stop();
long newSnapshotIndex = oldSnaphsotIndex + 100;
ozoneManager.getSnapshotInfo().saveRatisSnapshotToDisk(newSnapshotIndex);
// Start new Ratis server. It should pick up and load the new SnapshotInfo
omRatisServer = OzoneManagerRatisServer.newOMRatisServer(conf, ozoneManager,
omNodeDetails, Collections.emptyList());
omRatisServer.start();
long lastAppliedIndex = omRatisServer.getStateMachineLastAppliedIndex();
Assert.assertEquals(newSnapshotIndex, lastAppliedIndex);
}
/**
* Test that all of {@link OzoneManagerProtocolProtos.Type} enum values are
* categorized in {@link OmUtils#isReadOnly(OMRequest)}.
@ -176,7 +198,7 @@ public void verifyRaftGroupIdGenerationWithCustomOmServiceId() throws
int ratisPort = 9873;
InetSocketAddress rpcAddress = new InetSocketAddress(
InetAddress.getLocalHost(), 0);
OMNodeDetails omNodeDetails = new OMNodeDetails.Builder()
OMNodeDetails nodeDetails = new OMNodeDetails.Builder()
.setRpcAddress(rpcAddress)
.setRatisPort(ratisPort)
.setOMNodeId(newOmId)
@ -184,7 +206,7 @@ public void verifyRaftGroupIdGenerationWithCustomOmServiceId() throws
.build();
// Starts a single node Ratis server
OzoneManagerRatisServer newOmRatisServer = OzoneManagerRatisServer
.newOMRatisServer(newConf, ozoneManager, omNodeDetails,
.newOMRatisServer(newConf, ozoneManager, nodeDetails,
Collections.emptyList());
newOmRatisServer.start();
OzoneManagerRatisClient newOmRatisClient = OzoneManagerRatisClient
@ -198,4 +220,6 @@ public void verifyRaftGroupIdGenerationWithCustomOmServiceId() throws
Assert.assertEquals(uuid, raftGroupId.getUuid());
Assert.assertEquals(raftGroupId.toByteString().size(), 16);
}
}