diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerHAProtocol.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerHAProtocol.java index 675c814cf1..1434dca4c5 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerHAProtocol.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerHAProtocol.java @@ -29,10 +29,9 @@ public interface OzoneManagerHAProtocol { /** * Store the snapshot index i.e. the raft log index, corresponding to the * last transaction applied to the OM RocksDB, in OM metadata dir on disk. - * @param flush flush the OM DB to disk if true * @return the snapshot index * @throws IOException */ - long saveRatisSnapshot(boolean flush) throws IOException; + long saveRatisSnapshot() throws IOException; } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java index 58675e6629..32a485b9aa 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java @@ -236,19 +236,20 @@ private Map createOMService() throws IOException, for (int i = 1; i<= numOfOMs; i++) { // Set nodeId String nodeId = nodeIdBaseStr + i; - conf.set(OMConfigKeys.OZONE_OM_NODE_ID_KEY, nodeId); + OzoneConfiguration config = new OzoneConfiguration(conf); + config.set(OMConfigKeys.OZONE_OM_NODE_ID_KEY, nodeId); // Set the OM http(s) address to null so that the cluster picks // up the address set with service ID and node ID in initHAConfig - conf.set(OMConfigKeys.OZONE_OM_HTTP_ADDRESS_KEY, ""); - conf.set(OMConfigKeys.OZONE_OM_HTTPS_ADDRESS_KEY, ""); + config.set(OMConfigKeys.OZONE_OM_HTTP_ADDRESS_KEY, ""); + config.set(OMConfigKeys.OZONE_OM_HTTPS_ADDRESS_KEY, ""); // Set metadata/DB dir base path String metaDirPath = path + "/" + nodeId; - conf.set(OZONE_METADATA_DIRS, metaDirPath); - OMStorage omStore = new OMStorage(conf); + config.set(OZONE_METADATA_DIRS, metaDirPath); + OMStorage omStore = new OMStorage(config); initializeOmStorage(omStore); - OzoneManager om = OzoneManager.createOm(conf); + OzoneManager om = OzoneManager.createOm(config); om.setCertClient(certClient); omMap.put(nodeId, om); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java index 6ac28c3de6..ad8020bf9f 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java @@ -133,7 +133,6 @@ public void testInstallSnapshot() throws Exception { long leaderOMappliedLogIndex = leaderRatisServer.getStateMachineLastAppliedIndex(); - leaderOM.getOmRatisServer().getStateMachineLastAppliedIndex(); List keys = new ArrayList<>(); while (leaderOMappliedLogIndex < 2000) { @@ -143,7 +142,7 @@ public void testInstallSnapshot() throws Exception { } // Get the latest db checkpoint from the leader OM. - long leaderOMSnaphsotIndex = leaderOM.saveRatisSnapshot(true); + long leaderOMSnaphsotIndex = leaderOM.saveRatisSnapshot(); DBCheckpoint leaderDbCheckpoint = leaderOM.getMetadataManager().getStore().getCheckpoint(false); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java index e8242c6301..d46e4c61d7 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.net.ConnectException; import java.net.InetSocketAddress; +import java.util.ArrayList; import java.util.BitSet; import java.util.Collections; import java.util.HashMap; @@ -100,6 +101,7 @@ public class TestOzoneManagerHA { private String scmId; private int numOfOMs = 3; private static final long SNAPSHOT_THRESHOLD = 50; + private static final int LOG_PURGE_GAP = 50; @Rule public ExpectedException exception = ExpectedException.none(); @@ -126,6 +128,7 @@ public void init() throws Exception { conf.setLong( OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD_KEY, SNAPSHOT_THRESHOLD); + conf.setInt(OMConfigKeys.OZONE_OM_RATIS_LOG_PURGE_GAP, LOG_PURGE_GAP); cluster = (MiniOzoneHAClusterImpl) MiniOzoneCluster.newHABuilder(conf) .setClusterId(clusterId) .setScmId(scmId) @@ -1087,7 +1090,7 @@ public void testOMRatisSnapshot() throws Exception { } GenericTestUtils.waitFor(() -> { - if (ozoneManager.loadRatisSnapshotIndex() > 0) { + if (ozoneManager.getRatisSnapshotIndex() > 0) { return true; } return false; @@ -1097,7 +1100,7 @@ public void testOMRatisSnapshot() throws Exception { // than or equal to the saved snapshot index. long smLastAppliedIndex = ozoneManager.getOmRatisServer().getStateMachineLastAppliedIndex(); - long ratisSnapshotIndex = ozoneManager.loadRatisSnapshotIndex(); + long ratisSnapshotIndex = ozoneManager.getRatisSnapshotIndex(); Assert.assertTrue("LastAppliedIndex on OM State Machine (" + smLastAppliedIndex + ") is less than the saved snapshot index(" + ratisSnapshotIndex + ").", @@ -1111,14 +1114,14 @@ public void testOMRatisSnapshot() throws Exception { } GenericTestUtils.waitFor(() -> { - if (ozoneManager.loadRatisSnapshotIndex() > 0) { + if (ozoneManager.getRatisSnapshotIndex() > 0) { return true; } return false; }, 1000, 100000); // The new snapshot index must be greater than the previous snapshot index - long ratisSnapshotIndexNew = ozoneManager.loadRatisSnapshotIndex(); + long ratisSnapshotIndexNew = ozoneManager.getRatisSnapshotIndex(); Assert.assertTrue("Latest snapshot index must be greater than previous " + "snapshot indices", ratisSnapshotIndexNew > ratisSnapshotIndex); @@ -1138,4 +1141,101 @@ static String createKey(OzoneBucket ozoneBucket) throws IOException { ozoneOutputStream.close(); return keyName; } + + @Test + public void testOMRestart() throws Exception { + // Get the leader OM + String leaderOMNodeId = objectStore.getClientProxy().getOMProxyProvider() + .getCurrentProxyOMNodeId(); + OzoneManager leaderOM = cluster.getOzoneManager(leaderOMNodeId); + + // Get follower OMs + OzoneManager followerOM1 = cluster.getOzoneManager( + leaderOM.getPeerNodes().get(0).getOMNodeId()); + OzoneManager followerOM2 = cluster.getOzoneManager( + leaderOM.getPeerNodes().get(1).getOMNodeId()); + + // Do some transactions so that the log index increases + String userName = "user" + RandomStringUtils.randomNumeric(5); + String adminName = "admin" + RandomStringUtils.randomNumeric(5); + String volumeName = "volume" + RandomStringUtils.randomNumeric(5); + String bucketName = "bucket" + RandomStringUtils.randomNumeric(5); + + VolumeArgs createVolumeArgs = VolumeArgs.newBuilder() + .setOwner(userName) + .setAdmin(adminName) + .build(); + + objectStore.createVolume(volumeName, createVolumeArgs); + OzoneVolume retVolumeinfo = objectStore.getVolume(volumeName); + + retVolumeinfo.createBucket(bucketName); + OzoneBucket ozoneBucket = retVolumeinfo.getBucket(bucketName); + + for (int i = 0; i < 10; i++) { + createKey(ozoneBucket); + } + + long lastAppliedTxOnFollowerOM = + followerOM1.getOmRatisServer().getStateMachineLastAppliedIndex(); + + // Stop one follower OM + followerOM1.stop(); + + // Do more transactions. Stopped OM should miss these transactions and + // the logs corresponding to atleast some of the missed transactions + // should be purged. This will force the OM to install snapshot when + // restarted. + long minNewTxIndex = lastAppliedTxOnFollowerOM + (LOG_PURGE_GAP * 10); + long leaderOMappliedLogIndex = leaderOM.getOmRatisServer() + .getStateMachineLastAppliedIndex(); + + List missedKeys = new ArrayList<>(); + while (leaderOMappliedLogIndex < minNewTxIndex) { + missedKeys.add(createKey(ozoneBucket)); + leaderOMappliedLogIndex = leaderOM.getOmRatisServer() + .getStateMachineLastAppliedIndex(); + } + + // Restart the stopped OM. + followerOM1.restart(); + + // Get the latest snapshotIndex from the leader OM. + long leaderOMSnaphsotIndex = leaderOM.saveRatisSnapshot(); + + // The recently started OM should be lagging behind the leader OM. + long followerOMLastAppliedIndex = + followerOM1.getOmRatisServer().getStateMachineLastAppliedIndex(); + Assert.assertTrue( + followerOMLastAppliedIndex < leaderOMSnaphsotIndex); + + // Wait for the follower OM to catch up + GenericTestUtils.waitFor(() -> { + long lastAppliedIndex = + followerOM1.getOmRatisServer().getStateMachineLastAppliedIndex(); + if (lastAppliedIndex >= leaderOMSnaphsotIndex) { + return true; + } + return false; + }, 100, 200000); + + // Do more transactions. The restarted OM should receive the + // new transactions. It's last applied tx index should increase from the + // last snapshot index after more transactions are applied. + for (int i = 0; i < 10; i++) { + createKey(ozoneBucket); + } + long followerOM1lastAppliedIndex = followerOM1.getOmRatisServer() + .getStateMachineLastAppliedIndex(); + Assert.assertTrue(followerOM1lastAppliedIndex > + leaderOMSnaphsotIndex); + + // The follower OMs should be in sync. There can be a small lag between + // leader OM and follower OMs as txns are applied first on leader OM. + long followerOM2lastAppliedIndex = followerOM1.getOmRatisServer() + .getStateMachineLastAppliedIndex(); + Assert.assertEquals(followerOM1lastAppliedIndex, + followerOM2lastAppliedIndex); + + } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOMRatisSnapshotInfo.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOMRatisSnapshotInfo.java new file mode 100644 index 0000000000..56fef1a843 --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOMRatisSnapshotInfo.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.om.snapshot; + +import org.apache.hadoop.ozone.om.ratis.OMRatisSnapshotInfo; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.util.Random; + +/** + * Tests {@link org.apache.hadoop.ozone.om.ratis.OMRatisSnapshotInfo}. + */ +public class TestOMRatisSnapshotInfo { + + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + @Test + public void testSaveAndLoadSnapshotInfo() throws Exception { + File rootDir = folder.newFolder(); + OMRatisSnapshotInfo omRatisSnapshotInfo = new OMRatisSnapshotInfo(rootDir); + + // Initially term and index should be 0 and -1 + Assert.assertEquals(0, omRatisSnapshotInfo.getTerm()); + Assert.assertEquals(-1, omRatisSnapshotInfo.getIndex()); + + Random random = new Random(); + int snapshotIndex = random.nextInt(50); + int termIndex = random.nextInt(10); + + // Save snapshotInfo to disk + omRatisSnapshotInfo.updateTerm(termIndex); + omRatisSnapshotInfo.saveRatisSnapshotToDisk(snapshotIndex); + + Assert.assertEquals(termIndex, omRatisSnapshotInfo.getTerm()); + Assert.assertEquals(snapshotIndex, omRatisSnapshotInfo.getIndex()); + + // Load the snapshot file into new SnapshotInfo + OMRatisSnapshotInfo newSnapshotInfo = new OMRatisSnapshotInfo(rootDir); + + // Verify that the snapshot file loaded properly + Assert.assertEquals(termIndex, newSnapshotInfo.getTerm()); + Assert.assertEquals(snapshotIndex, newSnapshotInfo.getIndex()); + } + +} diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOzoneManagerSnapshotProvider.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOzoneManagerSnapshotProvider.java index 2f7550ce69..f70579baca 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOzoneManagerSnapshotProvider.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOzoneManagerSnapshotProvider.java @@ -114,7 +114,7 @@ public void testDownloadCheckpoint() throws Exception { DBCheckpoint omSnapshot = followerOM.getOmSnapshotProvider() .getOzoneManagerDBSnapshot(leaderOMNodeId); - long leaderSnapshotIndex = ozoneManager.loadRatisSnapshotIndex(); + long leaderSnapshotIndex = ozoneManager.getRatisSnapshotIndex(); long downloadedSnapshotIndex = omSnapshot.getRatisSnapshotIndex(); // The snapshot index downloaded from leader OM should match the ratis diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java index b36a12880c..292a8ddebc 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java @@ -126,9 +126,9 @@ public void doGet(HttpServletRequest request, HttpServletResponse response) { // ratis snapshot first. This step also included flushing the OM DB. // Hence, we can set flush to false. flush = false; - ratisSnapshotIndex = om.saveRatisSnapshot(true); + ratisSnapshotIndex = om.saveRatisSnapshot(); } else { - ratisSnapshotIndex = om.loadRatisSnapshotIndex(); + ratisSnapshotIndex = om.getRatisSnapshotIndex(); } DBCheckpoint checkpoint = omDbStore.getCheckpoint(flush); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java index bbbd61cada..121b6f635b 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java @@ -67,7 +67,6 @@ import org.apache.hadoop.hdds.server.ServiceRuntimeInfoImpl; import org.apache.hadoop.hdds.tracing.TracingUtil; import org.apache.hadoop.hdfs.DFSUtil; -import org.apache.hadoop.hdfs.util.PersistentLongFile; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.ipc.Client; @@ -81,6 +80,7 @@ import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider; import org.apache.hadoop.ozone.om.helpers.S3SecretValue; import org.apache.hadoop.ozone.om.protocol.OzoneManagerServerProtocol; +import org.apache.hadoop.ozone.om.ratis.OMRatisSnapshotInfo; import org.apache.hadoop.ozone.om.snapshot.OzoneManagerSnapshotProvider; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DBUpdatesRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos @@ -196,7 +196,6 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT; import static org.apache.hadoop.ozone.OzoneConsts.OM_METRICS_FILE; import static org.apache.hadoop.ozone.OzoneConsts.OM_METRICS_TEMP_FILE; -import static org.apache.hadoop.ozone.OzoneConsts.OM_RATIS_SNAPSHOT_INDEX; import static org.apache.hadoop.ozone.OzoneConsts.RPC_PORT; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_HANDLER_COUNT_DEFAULT; @@ -284,8 +283,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl private OMNodeDetails omNodeDetails; private List peerNodes; private File omRatisSnapshotDir; - private final File ratisSnapshotFile; - private long snapshotIndex; + private final OMRatisSnapshotInfo omRatisSnapshotInfo; private final Collection ozAdmins; private KeyProviderCryptoExtension kmsProvider = null; @@ -388,6 +386,9 @@ private OzoneManager(OzoneConfiguration conf) throws IOException, instantiateServices(); + this.omRatisSnapshotInfo = new OMRatisSnapshotInfo( + omStorage.getCurrentDir()); + initializeRatisServer(); initializeRatisClient(); @@ -409,10 +410,6 @@ private OzoneManager(OzoneConfiguration conf) throws IOException, } } - this.ratisSnapshotFile = new File(omStorage.getCurrentDir(), - OM_RATIS_SNAPSHOT_INDEX); - this.snapshotIndex = loadRatisSnapshotIndex(); - metrics = OMMetrics.create(); // Start Om Rpc Server. @@ -1313,7 +1310,8 @@ public void restart() throws IOException { HddsUtils.initializeMetrics(configuration, "OzoneManager"); - metadataManager.start(configuration); + instantiateServices(); + startSecretManagerIfNecessary(); // Set metrics and start metrics back ground thread @@ -1334,7 +1332,6 @@ public void restart() throws IOException { metricsTimer = new Timer(); metricsTimer.schedule(scheduleOMMetricsWriteTask, 0, period); - keyManager.start(configuration); omRpcServer = getRpcServer(configuration); omRpcServer.start(); isOmRpcServerRunning = true; @@ -1420,31 +1417,23 @@ private void initializeRatisClient() throws IOException { } } + public OMRatisSnapshotInfo getSnapshotInfo() { + return omRatisSnapshotInfo; + } + @VisibleForTesting - public long loadRatisSnapshotIndex() { - if (ratisSnapshotFile.exists()) { - try { - return PersistentLongFile.readFile(ratisSnapshotFile, 0); - } catch (IOException e) { - LOG.error("Unable to read the ratis snapshot index (last applied " + - "transaction log index)", e); - } - } - return 0; + public long getRatisSnapshotIndex() { + return omRatisSnapshotInfo.getIndex(); } @Override - public long saveRatisSnapshot(boolean flush) throws IOException { - snapshotIndex = omRatisServer.getStateMachineLastAppliedIndex(); + public long saveRatisSnapshot() throws IOException { + long snapshotIndex = omRatisServer.getStateMachineLastAppliedIndex(); - if (flush) { - // Flush the OM state to disk - metadataManager.getStore().flush(); - } + // Flush the OM state to disk + metadataManager.getStore().flush(); - PersistentLongFile.writeFile(ratisSnapshotFile, snapshotIndex); - LOG.info("Saved Ratis Snapshot on the OM with snapshotIndex {}", - snapshotIndex); + omRatisSnapshotInfo.saveRatisSnapshotToDisk(snapshotIndex); return snapshotIndex; } @@ -1468,9 +1457,11 @@ public void stop() { } if (omRatisServer != null) { omRatisServer.stop(); + omRatisServer = null; } if (omRatisClient != null) { omRatisClient.close(); + omRatisClient = null; } isOmRpcServerRunning = false; keyManager.stop(); @@ -3349,8 +3340,7 @@ void reloadOMState(long newSnapshotIndex) throws IOException { // Update OM snapshot index with the new snapshot index (from the new OM // DB state) and save the snapshot index to disk - this.snapshotIndex = newSnapshotIndex; - saveRatisSnapshot(false); + omRatisSnapshotInfo.saveRatisSnapshotToDisk(newSnapshotIndex); } public static Logger getLogger() { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OMRatisSnapshotInfo.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OMRatisSnapshotInfo.java new file mode 100644 index 0000000000..520c1171f8 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OMRatisSnapshotInfo.java @@ -0,0 +1,180 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.om.ratis; + +import org.apache.ratis.server.protocol.TermIndex; +import org.apache.ratis.server.storage.FileInfo; +import org.apache.ratis.statemachine.SnapshotInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.yaml.snakeyaml.DumperOptions; +import org.yaml.snakeyaml.Yaml; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.Writer; +import java.util.List; + +import static org.apache.hadoop.ozone.OzoneConsts.OM_RATIS_SNAPSHOT_INDEX; + +/** + * This class captures the snapshotIndex and term of the latest snapshot in + * the OM. + * Ratis server loads the snapshotInfo during startup and updates the + * lastApplied index to this snapshotIndex. OM SnapshotInfo does not contain + * any files. It is used only to store/ update the last applied index and term. + */ +public class OMRatisSnapshotInfo implements SnapshotInfo { + + static final Logger LOG = LoggerFactory.getLogger(OMRatisSnapshotInfo.class); + + private volatile long term = 0; + private volatile long snapshotIndex = -1; + + private final File ratisSnapshotFile; + + public OMRatisSnapshotInfo(File ratisDir) throws IOException { + ratisSnapshotFile = new File(ratisDir, OM_RATIS_SNAPSHOT_INDEX); + loadRatisSnapshotIndex(); + } + + public void updateTerm(long newTerm) { + term = newTerm; + } + + private void updateSnapshotIndex(long newSnapshotIndex) { + snapshotIndex = newSnapshotIndex; + } + + private void updateTermIndex(long newTerm, long newIndex) { + this.term = newTerm; + this.snapshotIndex = newIndex; + } + + /** + * Load the snapshot index and term from the snapshot file on disk, + * if it exists. + * @throws IOException + */ + private void loadRatisSnapshotIndex() throws IOException { + if (ratisSnapshotFile.exists()) { + RatisSnapshotYaml ratisSnapshotYaml = readRatisSnapshotYaml(); + updateTermIndex(ratisSnapshotYaml.term, ratisSnapshotYaml.snapshotIndex); + } + } + + /** + * Read and parse the snapshot yaml file. + */ + private RatisSnapshotYaml readRatisSnapshotYaml() throws IOException { + try (FileInputStream inputFileStream = new FileInputStream( + ratisSnapshotFile)) { + Yaml yaml = new Yaml(); + try { + return yaml.loadAs(inputFileStream, RatisSnapshotYaml.class); + } catch (Exception e) { + throw new IOException("Unable to parse RatisSnapshot yaml file.", e); + } + } + } + + /** + * Update and persist the snapshot index and term to disk. + * @param index new snapshot index to be persisted to disk. + * @throws IOException + */ + public void saveRatisSnapshotToDisk(long index) throws IOException { + updateSnapshotIndex(index); + writeRatisSnapshotYaml(); + LOG.info("Saved Ratis Snapshot on the OM with snapshotIndex {}", index); + } + + /** + * Write snapshot details to disk in yaml format. + */ + private void writeRatisSnapshotYaml() throws IOException { + DumperOptions options = new DumperOptions(); + options.setPrettyFlow(true); + options.setDefaultFlowStyle(DumperOptions.FlowStyle.FLOW); + Yaml yaml = new Yaml(options); + + RatisSnapshotYaml ratisSnapshotYaml = new RatisSnapshotYaml(term, + snapshotIndex); + + try (Writer writer = new OutputStreamWriter( + new FileOutputStream(ratisSnapshotFile), "UTF-8")) { + yaml.dump(ratisSnapshotYaml, writer); + } + } + + @Override + public TermIndex getTermIndex() { + return TermIndex.newTermIndex(term, snapshotIndex); + } + + @Override + public long getTerm() { + return term; + } + + @Override + public long getIndex() { + return snapshotIndex; + } + + @Override + public List getFiles() { + return null; + } + + /** + * Ratis Snapshot details to be written to the yaml file. + */ + public static class RatisSnapshotYaml { + private long term; + private long snapshotIndex; + + public RatisSnapshotYaml() { + // Needed for snake-yaml introspection. + } + + RatisSnapshotYaml(long term, long snapshotIndex) { + this.term = term; + this.snapshotIndex = snapshotIndex; + } + + public void setTerm(long term) { + this.term = term; + } + + public long getTerm() { + return this.term; + } + + public void setSnapshotIndex(long index) { + this.snapshotIndex = index; + } + + public long getSnapshotIndex() { + return this.snapshotIndex; + } + } +} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java index c51323e3a7..e302956d39 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java @@ -28,8 +28,6 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; -import org.apache.hadoop.ozone.container.common.transport.server.ratis - .ContainerStateMachine; import org.apache.hadoop.ozone.om.OzoneManager; import org.apache.hadoop.ozone.om.exceptions.OMException; import org.apache.hadoop.ozone.om.helpers.OMRatisHelper; @@ -48,6 +46,7 @@ import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.server.storage.RaftStorage; +import org.apache.ratis.statemachine.SnapshotInfo; import org.apache.ratis.statemachine.TransactionContext; import org.apache.ratis.statemachine.impl.BaseStateMachine; import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage; @@ -64,26 +63,33 @@ public class OzoneManagerStateMachine extends BaseStateMachine { static final Logger LOG = - LoggerFactory.getLogger(ContainerStateMachine.class); + LoggerFactory.getLogger(OzoneManagerStateMachine.class); private final SimpleStateMachineStorage storage = new SimpleStateMachineStorage(); private final OzoneManagerRatisServer omRatisServer; private final OzoneManager ozoneManager; private OzoneManagerHARequestHandler handler; private RaftGroupId raftGroupId; - private long lastAppliedIndex = 0; + private long lastAppliedIndex; private OzoneManagerDoubleBuffer ozoneManagerDoubleBuffer; + private final OMRatisSnapshotInfo snapshotInfo; private final ExecutorService executorService; private final ExecutorService installSnapshotExecutor; public OzoneManagerStateMachine(OzoneManagerRatisServer ratisServer) { this.omRatisServer = ratisServer; this.ozoneManager = omRatisServer.getOzoneManager(); + + this.snapshotInfo = ozoneManager.getSnapshotInfo(); + updateLastAppliedIndexWithSnaphsotIndex(); + this.ozoneManagerDoubleBuffer = new OzoneManagerDoubleBuffer(ozoneManager.getMetadataManager(), this::updateLastAppliedIndex); + this.handler = new OzoneManagerHARequestHandlerImpl(ozoneManager, ozoneManagerDoubleBuffer); + ThreadFactory build = new ThreadFactoryBuilder().setDaemon(true) .setNameFormat("OM StateMachine ApplyTransaction Thread - %d").build(); this.executorService = HadoopExecutors.newSingleThreadExecutor(build); @@ -103,6 +109,29 @@ public void initialize(RaftServer server, RaftGroupId id, }); } + @Override + public SnapshotInfo getLatestSnapshot() { + return snapshotInfo; + } + + /** + * Called to notify state machine about indexes which are processed + * internally by Raft Server, this currently happens when conf entries are + * processed in raft Server. This keep state machine to keep a track of index + * updates. + * @param term term of the current log entry + * @param index index which is being updated + */ + @Override + public void notifyIndexUpdate(long term, long index) { + // SnapshotInfo should be updated when the term changes. + // The index here refers to the log entry index and the index in + // SnapshotInfo represents the snapshotIndex i.e. the index of the last + // transaction included in the snapshot. Hence, snaphsotInfo#index is not + // updated here. + snapshotInfo.updateTerm(term); + } + /** * Validate/pre-process the incoming update request in the state machine. * @return the content to be written to the log entry. Null means the request @@ -224,7 +253,7 @@ public void unpause(long newLastAppliedSnaphsotIndex) { public long takeSnapshot() throws IOException { LOG.info("Saving Ratis snapshot on the OM."); if (ozoneManager != null) { - return ozoneManager.saveRatisSnapshot(true); + return ozoneManager.saveRatisSnapshot(); } return 0; } @@ -305,6 +334,10 @@ public void updateLastAppliedIndex(long lastAppliedIndex) { this.lastAppliedIndex = lastAppliedIndex; } + public void updateLastAppliedIndexWithSnaphsotIndex() { + this.lastAppliedIndex = snapshotInfo.getIndex(); + } + /** * Submits read request to OM and returns the response Message. * @param request OMRequest diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerRatisServer.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerRatisServer.java index d5958030bc..d613d99286 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerRatisServer.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerRatisServer.java @@ -70,6 +70,7 @@ public class TestOzoneManagerRatisServer { private static final long LEADER_ELECTION_TIMEOUT = 500L; private OMMetadataManager omMetadataManager; private OzoneManager ozoneManager; + private OMNodeDetails omNodeDetails; @Before public void init() throws Exception { @@ -86,7 +87,7 @@ public void init() throws Exception { OMConfigKeys.OZONE_OM_RATIS_PORT_DEFAULT); InetSocketAddress rpcAddress = new InetSocketAddress( InetAddress.getLocalHost(), 0); - OMNodeDetails omNodeDetails = new OMNodeDetails.Builder() + omNodeDetails = new OMNodeDetails.Builder() .setRpcAddress(rpcAddress) .setRatisPort(ratisPort) .setOMNodeId(omID) @@ -99,6 +100,9 @@ public void init() throws Exception { folder.newFolder().getAbsolutePath()); omMetadataManager = new OmMetadataManagerImpl(ozoneConfiguration); when(ozoneManager.getMetadataManager()).thenReturn(omMetadataManager); + OMRatisSnapshotInfo omRatisSnapshotInfo = new OMRatisSnapshotInfo( + folder.newFolder()); + when(ozoneManager.getSnapshotInfo()).thenReturn(omRatisSnapshotInfo); omRatisServer = OzoneManagerRatisServer.newOMRatisServer(conf, ozoneManager, omNodeDetails, Collections.emptyList()); omRatisServer.start(); @@ -126,6 +130,24 @@ public void testStartOMRatisServer() throws Exception { LifeCycle.State.RUNNING, omRatisServer.getServerState()); } + @Test + public void testLoadSnapshotInfoOnStart() throws Exception { + // Stop the Ratis server and manually update the snapshotInfo. + long oldSnaphsotIndex = ozoneManager.saveRatisSnapshot(); + ozoneManager.getSnapshotInfo().saveRatisSnapshotToDisk(oldSnaphsotIndex); + omRatisServer.stop(); + long newSnapshotIndex = oldSnaphsotIndex + 100; + ozoneManager.getSnapshotInfo().saveRatisSnapshotToDisk(newSnapshotIndex); + + // Start new Ratis server. It should pick up and load the new SnapshotInfo + omRatisServer = OzoneManagerRatisServer.newOMRatisServer(conf, ozoneManager, + omNodeDetails, Collections.emptyList()); + omRatisServer.start(); + long lastAppliedIndex = omRatisServer.getStateMachineLastAppliedIndex(); + + Assert.assertEquals(newSnapshotIndex, lastAppliedIndex); + } + /** * Test that all of {@link OzoneManagerProtocolProtos.Type} enum values are * categorized in {@link OmUtils#isReadOnly(OMRequest)}. @@ -176,7 +198,7 @@ public void verifyRaftGroupIdGenerationWithCustomOmServiceId() throws int ratisPort = 9873; InetSocketAddress rpcAddress = new InetSocketAddress( InetAddress.getLocalHost(), 0); - OMNodeDetails omNodeDetails = new OMNodeDetails.Builder() + OMNodeDetails nodeDetails = new OMNodeDetails.Builder() .setRpcAddress(rpcAddress) .setRatisPort(ratisPort) .setOMNodeId(newOmId) @@ -184,7 +206,7 @@ public void verifyRaftGroupIdGenerationWithCustomOmServiceId() throws .build(); // Starts a single node Ratis server OzoneManagerRatisServer newOmRatisServer = OzoneManagerRatisServer - .newOMRatisServer(newConf, ozoneManager, omNodeDetails, + .newOMRatisServer(newConf, ozoneManager, nodeDetails, Collections.emptyList()); newOmRatisServer.start(); OzoneManagerRatisClient newOmRatisClient = OzoneManagerRatisClient @@ -198,4 +220,6 @@ public void verifyRaftGroupIdGenerationWithCustomOmServiceId() throws Assert.assertEquals(uuid, raftGroupId.getUuid()); Assert.assertEquals(raftGroupId.toByteString().size(), 16); } + + }