From 56b7ec71a69820ae12b4b9e2eb04b7368f721dbf Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Mon, 26 Jan 2015 16:28:55 +0000 Subject: [PATCH] MAPREDUCE-6141. History server leveldb recovery store. Contributed by Jason Lowe --- hadoop-mapreduce-project/CHANGES.txt | 2 + .../v2/jobhistory/JHAdminConfig.java | 7 + .../src/main/resources/mapred-default.xml | 8 + .../hadoop-mapreduce-client-hs/pom.xml | 4 + ...HistoryServerLeveldbStateStoreService.java | 379 ++++++++++++++++++ ...HistoryServerLeveldbStateStoreService.java | 207 ++++++++++ 6 files changed, 607 insertions(+) create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerLeveldbStateStoreService.java create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryServerLeveldbStateStoreService.java diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index b28fc65c91..35ceb2e51f 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -262,6 +262,8 @@ Release 2.7.0 - UNRELEASED cache with enabling wired encryption at the same time. (Junping Du via xgong) + MAPREDUCE-6141. History server leveldb recovery store (jlowe) + OPTIMIZATIONS MAPREDUCE-6169. MergeQueue should release reference to the current item diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java index e5a49b5e1b..f7cba9f806 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java @@ -197,6 +197,13 @@ public class JHAdminConfig { public static final String MR_HS_FS_STATE_STORE_URI = MR_HISTORY_PREFIX + "recovery.store.fs.uri"; + /** + * The local path where server state will be stored when + * HistoryServerLeveldbStateStoreService is configured as the state store + */ + public static final String MR_HS_LEVELDB_STATE_STORE_PATH = + MR_HISTORY_PREFIX + "recovery.store.leveldb.path"; + /** Whether to use fixed ports with the minicluster. */ public static final String MR_HISTORY_MINICLUSTER_FIXED_PORTS = MR_HISTORY_PREFIX + "minicluster.fixed.ports"; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml index 57a17a80c7..4535137617 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml @@ -1568,6 +1568,14 @@ storage class. + + mapreduce.jobhistory.recovery.store.leveldb.path + ${hadoop.tmp.dir}/mapred/history/recoverystore + The URI where history server state will be stored if + HistoryServerLeveldbSystemStateStoreService is configured as the recovery + storage class. + + mapreduce.jobhistory.http.policy HTTP_ONLY diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/pom.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/pom.xml index adeb9fa621..fa8162b7c4 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/pom.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/pom.xml @@ -63,6 +63,10 @@ test-jar test + + org.fusesource.leveldbjni + leveldbjni-all + diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerLeveldbStateStoreService.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerLeveldbStateStoreService.java new file mode 100644 index 0000000000..16366b1ff8 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerLeveldbStateStoreService.java @@ -0,0 +1,379 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.v2.hs; + +import static org.fusesource.leveldbjni.JniDBFactory.asString; +import static org.fusesource.leveldbjni.JniDBFactory.bytes; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.IOException; +import java.util.Map.Entry; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.mapreduce.v2.api.MRDelegationTokenIdentifier; +import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; +import org.apache.hadoop.security.token.delegation.DelegationKey; +import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto; +import org.apache.hadoop.yarn.server.records.Version; +import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl; +import org.apache.hadoop.yarn.server.utils.LeveldbIterator; +import org.fusesource.leveldbjni.JniDBFactory; +import org.fusesource.leveldbjni.internal.NativeDB; +import org.iq80.leveldb.DB; +import org.iq80.leveldb.DBException; +import org.iq80.leveldb.Logger; +import org.iq80.leveldb.Options; + +public class HistoryServerLeveldbStateStoreService extends + HistoryServerStateStoreService { + + private static final String DB_NAME = "mr-jhs-state"; + private static final String DB_SCHEMA_VERSION_KEY = "jhs-schema-version"; + private static final String TOKEN_MASTER_KEY_KEY_PREFIX = "tokens/key_"; + private static final String TOKEN_STATE_KEY_PREFIX = "tokens/token_"; + + private static final Version CURRENT_VERSION_INFO = + Version.newInstance(1, 0); + + private DB db; + + public static final Log LOG = + LogFactory.getLog(HistoryServerLeveldbStateStoreService.class); + + @Override + protected void initStorage(Configuration conf) throws IOException { + } + + @Override + protected void startStorage() throws IOException { + Path storeRoot = createStorageDir(getConfig()); + Options options = new Options(); + options.createIfMissing(false); + options.logger(new LeveldbLogger()); + LOG.info("Using state database at " + storeRoot + " for recovery"); + File dbfile = new File(storeRoot.toString()); + try { + db = JniDBFactory.factory.open(dbfile, options); + } catch (NativeDB.DBException e) { + if (e.isNotFound() || e.getMessage().contains(" does not exist ")) { + LOG.info("Creating state database at " + dbfile); + options.createIfMissing(true); + try { + db = JniDBFactory.factory.open(dbfile, options); + // store version + storeVersion(); + } catch (DBException dbErr) { + throw new IOException(dbErr.getMessage(), dbErr); + } + } else { + throw e; + } + } + checkVersion(); + } + + @Override + protected void closeStorage() throws IOException { + if (db != null) { + db.close(); + db = null; + } + } + + @Override + public HistoryServerState loadState() throws IOException { + HistoryServerState state = new HistoryServerState(); + int numKeys = loadTokenMasterKeys(state); + LOG.info("Recovered " + numKeys + " token master keys"); + int numTokens = loadTokens(state); + LOG.info("Recovered " + numTokens + " tokens"); + return state; + } + + private int loadTokenMasterKeys(HistoryServerState state) + throws IOException { + int numKeys = 0; + LeveldbIterator iter = null; + try { + iter = new LeveldbIterator(db); + iter.seek(bytes(TOKEN_MASTER_KEY_KEY_PREFIX)); + while (iter.hasNext()) { + Entry entry = iter.next(); + String key = asString(entry.getKey()); + if (!key.startsWith(TOKEN_MASTER_KEY_KEY_PREFIX)) { + break; + } + if (LOG.isDebugEnabled()) { + LOG.debug("Loading master key from " + key); + } + try { + loadTokenMasterKey(state, entry.getValue()); + } catch (IOException e) { + throw new IOException("Error loading token master key from " + key, + e); + } + ++numKeys; + } + } catch (DBException e) { + throw new IOException(e); + } finally { + if (iter != null) { + iter.close(); + } + } + return numKeys; + } + + private void loadTokenMasterKey(HistoryServerState state, byte[] data) + throws IOException { + DelegationKey key = new DelegationKey(); + DataInputStream in = + new DataInputStream(new ByteArrayInputStream(data)); + try { + key.readFields(in); + } finally { + IOUtils.cleanup(LOG, in); + } + state.tokenMasterKeyState.add(key); + } + + private int loadTokens(HistoryServerState state) throws IOException { + int numTokens = 0; + LeveldbIterator iter = null; + try { + iter = new LeveldbIterator(db); + iter.seek(bytes(TOKEN_STATE_KEY_PREFIX)); + while (iter.hasNext()) { + Entry entry = iter.next(); + String key = asString(entry.getKey()); + if (!key.startsWith(TOKEN_STATE_KEY_PREFIX)) { + break; + } + if (LOG.isDebugEnabled()) { + LOG.debug("Loading token from " + key); + } + try { + loadToken(state, entry.getValue()); + } catch (IOException e) { + throw new IOException("Error loading token state from " + key, e); + } + ++numTokens; + } + } catch (DBException e) { + throw new IOException(e); + } finally { + if (iter != null) { + iter.close(); + } + } + return numTokens; + } + + private void loadToken(HistoryServerState state, byte[] data) + throws IOException { + MRDelegationTokenIdentifier tokenId = new MRDelegationTokenIdentifier(); + long renewDate; + DataInputStream in = new DataInputStream(new ByteArrayInputStream(data)); + try { + tokenId.readFields(in); + renewDate = in.readLong(); + } finally { + IOUtils.cleanup(LOG, in); + } + state.tokenState.put(tokenId, renewDate); + } + + @Override + public void storeToken(MRDelegationTokenIdentifier tokenId, Long renewDate) + throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Storing token " + tokenId.getSequenceNumber()); + } + + ByteArrayOutputStream memStream = new ByteArrayOutputStream(); + DataOutputStream dataStream = new DataOutputStream(memStream); + try { + tokenId.write(dataStream); + dataStream.writeLong(renewDate); + dataStream.close(); + dataStream = null; + } finally { + IOUtils.cleanup(LOG, dataStream); + } + + String dbKey = getTokenDatabaseKey(tokenId); + try { + db.put(bytes(dbKey), memStream.toByteArray()); + } catch (DBException e) { + throw new IOException(e); + } + } + + @Override + public void updateToken(MRDelegationTokenIdentifier tokenId, Long renewDate) + throws IOException { + storeToken(tokenId, renewDate); + } + + @Override + public void removeToken(MRDelegationTokenIdentifier tokenId) + throws IOException { + String dbKey = getTokenDatabaseKey(tokenId); + try { + db.delete(bytes(dbKey)); + } catch (DBException e) { + throw new IOException(e); + } + } + + private String getTokenDatabaseKey(MRDelegationTokenIdentifier tokenId) { + return TOKEN_STATE_KEY_PREFIX + tokenId.getSequenceNumber(); + } + + @Override + public void storeTokenMasterKey(DelegationKey masterKey) + throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Storing master key " + masterKey.getKeyId()); + } + + ByteArrayOutputStream memStream = new ByteArrayOutputStream(); + DataOutputStream dataStream = new DataOutputStream(memStream); + try { + masterKey.write(dataStream); + dataStream.close(); + dataStream = null; + } finally { + IOUtils.cleanup(LOG, dataStream); + } + + String dbKey = getTokenMasterKeyDatabaseKey(masterKey); + try { + db.put(bytes(dbKey), memStream.toByteArray()); + } catch (DBException e) { + throw new IOException(e); + } + } + + @Override + public void removeTokenMasterKey(DelegationKey masterKey) + throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Removing master key " + masterKey.getKeyId()); + } + + String dbKey = getTokenMasterKeyDatabaseKey(masterKey); + try { + db.delete(bytes(dbKey)); + } catch (DBException e) { + throw new IOException(e); + } + } + + private String getTokenMasterKeyDatabaseKey(DelegationKey masterKey) { + return TOKEN_MASTER_KEY_KEY_PREFIX + masterKey.getKeyId(); + } + + private Path createStorageDir(Configuration conf) throws IOException { + String confPath = conf.get(JHAdminConfig.MR_HS_LEVELDB_STATE_STORE_PATH); + if (confPath == null) { + throw new IOException("No store location directory configured in " + + JHAdminConfig.MR_HS_LEVELDB_STATE_STORE_PATH); + } + Path root = new Path(confPath, DB_NAME); + FileSystem fs = FileSystem.getLocal(conf); + fs.mkdirs(root, new FsPermission((short)0700)); + return root; + } + + Version loadVersion() throws IOException { + byte[] data = db.get(bytes(DB_SCHEMA_VERSION_KEY)); + // if version is not stored previously, treat it as 1.0. + if (data == null || data.length == 0) { + return Version.newInstance(1, 0); + } + Version version = + new VersionPBImpl(VersionProto.parseFrom(data)); + return version; + } + + private void storeVersion() throws IOException { + dbStoreVersion(CURRENT_VERSION_INFO); + } + + void dbStoreVersion(Version state) throws IOException { + String key = DB_SCHEMA_VERSION_KEY; + byte[] data = + ((VersionPBImpl) state).getProto().toByteArray(); + try { + db.put(bytes(key), data); + } catch (DBException e) { + throw new IOException(e); + } + } + + Version getCurrentVersion() { + return CURRENT_VERSION_INFO; + } + + /** + * 1) Versioning scheme: major.minor. For e.g. 1.0, 1.1, 1.2...1.25, 2.0 etc. + * 2) Any incompatible change of state-store is a major upgrade, and any + * compatible change of state-store is a minor upgrade. + * 3) Within a minor upgrade, say 1.1 to 1.2: + * overwrite the version info and proceed as normal. + * 4) Within a major upgrade, say 1.2 to 2.0: + * throw exception and indicate user to use a separate upgrade tool to + * upgrade state or remove incompatible old state. + */ + private void checkVersion() throws IOException { + Version loadedVersion = loadVersion(); + LOG.info("Loaded state version info " + loadedVersion); + if (loadedVersion.equals(getCurrentVersion())) { + return; + } + if (loadedVersion.isCompatibleTo(getCurrentVersion())) { + LOG.info("Storing state version info " + getCurrentVersion()); + storeVersion(); + } else { + throw new IOException( + "Incompatible version for state: expecting state version " + + getCurrentVersion() + ", but loading version " + loadedVersion); + } + } + + private static class LeveldbLogger implements Logger { + private static final Log LOG = LogFactory.getLog(LeveldbLogger.class); + + @Override + public void log(String message) { + LOG.info(message); + } + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryServerLeveldbStateStoreService.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryServerLeveldbStateStoreService.java new file mode 100644 index 0000000000..2af2f84302 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryServerLeveldbStateStoreService.java @@ -0,0 +1,207 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.v2.hs; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.File; +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.v2.api.MRDelegationTokenIdentifier; +import org.apache.hadoop.mapreduce.v2.hs.HistoryServerStateStoreService.HistoryServerState; +import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; +import org.apache.hadoop.security.token.delegation.DelegationKey; +import org.apache.hadoop.service.ServiceStateException; +import org.apache.hadoop.yarn.server.records.Version; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TestHistoryServerLeveldbStateStoreService { + + private static final File testDir = new File( + System.getProperty("test.build.data", + System.getProperty("java.io.tmpdir")), + "TestHistoryServerLeveldbSystemStateStoreService"); + + private Configuration conf; + + @Before + public void setup() { + FileUtil.fullyDelete(testDir); + testDir.mkdirs(); + conf = new Configuration(); + conf.setBoolean(JHAdminConfig.MR_HS_RECOVERY_ENABLE, true); + conf.setClass(JHAdminConfig.MR_HS_STATE_STORE, + HistoryServerLeveldbStateStoreService.class, + HistoryServerStateStoreService.class); + conf.set(JHAdminConfig.MR_HS_LEVELDB_STATE_STORE_PATH, + testDir.getAbsoluteFile().toString()); + } + + @After + public void cleanup() { + FileUtil.fullyDelete(testDir); + } + + private HistoryServerStateStoreService createAndStartStore() + throws IOException { + HistoryServerStateStoreService store = + HistoryServerStateStoreServiceFactory.getStore(conf); + assertTrue("Factory did not create a leveldb store", + store instanceof HistoryServerLeveldbStateStoreService); + store.init(conf); + store.start(); + return store; + } + + @Test + public void testCheckVersion() throws IOException { + HistoryServerLeveldbStateStoreService store = + new HistoryServerLeveldbStateStoreService(); + store.init(conf); + store.start(); + + // default version + Version defaultVersion = store.getCurrentVersion(); + assertEquals(defaultVersion, store.loadVersion()); + + // compatible version + Version compatibleVersion = + Version.newInstance(defaultVersion.getMajorVersion(), + defaultVersion.getMinorVersion() + 2); + store.dbStoreVersion(compatibleVersion); + assertEquals(compatibleVersion, store.loadVersion()); + store.close(); + store = new HistoryServerLeveldbStateStoreService(); + store.init(conf); + store.start(); + + // overwrite the compatible version + assertEquals(defaultVersion, store.loadVersion()); + + // incompatible version + Version incompatibleVersion = + Version.newInstance(defaultVersion.getMajorVersion() + 1, + defaultVersion.getMinorVersion()); + store.dbStoreVersion(incompatibleVersion); + store.close(); + store = new HistoryServerLeveldbStateStoreService(); + try { + store.init(conf); + store.start(); + fail("Incompatible version, should have thrown before here."); + } catch (ServiceStateException e) { + assertTrue("Exception message mismatch", + e.getMessage().contains("Incompatible version for state:")); + } + store.close(); + } + + @Test + public void testTokenStore() throws IOException { + HistoryServerStateStoreService store = createAndStartStore(); + + // verify initially the store is empty + HistoryServerState state = store.loadState(); + assertTrue("token state not empty", state.tokenState.isEmpty()); + assertTrue("key state not empty", state.tokenMasterKeyState.isEmpty()); + + // store a key and some tokens + final DelegationKey key1 = new DelegationKey(1, 2, "keyData1".getBytes()); + final MRDelegationTokenIdentifier token1 = + new MRDelegationTokenIdentifier(new Text("tokenOwner1"), + new Text("tokenRenewer1"), new Text("tokenUser1")); + token1.setSequenceNumber(1); + final Long tokenDate1 = 1L; + final MRDelegationTokenIdentifier token2 = + new MRDelegationTokenIdentifier(new Text("tokenOwner2"), + new Text("tokenRenewer2"), new Text("tokenUser2")); + token2.setSequenceNumber(12345678); + final Long tokenDate2 = 87654321L; + + store.storeTokenMasterKey(key1); + store.storeToken(token1, tokenDate1); + store.storeToken(token2, tokenDate2); + store.close(); + + // verify the key and tokens can be recovered + store = createAndStartStore(); + state = store.loadState(); + assertEquals("incorrect loaded token count", 2, state.tokenState.size()); + assertTrue("missing token 1", state.tokenState.containsKey(token1)); + assertEquals("incorrect token 1 date", tokenDate1, + state.tokenState.get(token1)); + assertTrue("missing token 2", state.tokenState.containsKey(token2)); + assertEquals("incorrect token 2 date", tokenDate2, + state.tokenState.get(token2)); + assertEquals("incorrect master key count", 1, + state.tokenMasterKeyState.size()); + assertTrue("missing master key 1", + state.tokenMasterKeyState.contains(key1)); + + // store some more keys and tokens, remove the previous key and one + // of the tokens, and renew a previous token + final DelegationKey key2 = new DelegationKey(3, 4, "keyData2".getBytes()); + final DelegationKey key3 = new DelegationKey(5, 6, "keyData3".getBytes()); + final MRDelegationTokenIdentifier token3 = + new MRDelegationTokenIdentifier(new Text("tokenOwner3"), + new Text("tokenRenewer3"), new Text("tokenUser3")); + token3.setSequenceNumber(12345679); + final Long tokenDate3 = 87654321L; + + store.removeToken(token1); + store.storeTokenMasterKey(key2); + final Long newTokenDate2 = 975318642L; + store.updateToken(token2, newTokenDate2); + store.removeTokenMasterKey(key1); + store.storeTokenMasterKey(key3); + store.storeToken(token3, tokenDate3); + store.close(); + + // verify the new keys and tokens are recovered, the removed key and + // token are no longer present, and the renewed token has the updated + // expiration date + store = createAndStartStore(); + state = store.loadState(); + assertEquals("incorrect loaded token count", 2, state.tokenState.size()); + assertFalse("token 1 not removed", state.tokenState.containsKey(token1)); + assertTrue("missing token 2", state.tokenState.containsKey(token2)); + assertEquals("incorrect token 2 date", newTokenDate2, + state.tokenState.get(token2)); + assertTrue("missing token 3", state.tokenState.containsKey(token3)); + assertEquals("incorrect token 3 date", tokenDate3, + state.tokenState.get(token3)); + assertEquals("incorrect master key count", 2, + state.tokenMasterKeyState.size()); + assertFalse("master key 1 not removed", + state.tokenMasterKeyState.contains(key1)); + assertTrue("missing master key 2", + state.tokenMasterKeyState.contains(key2)); + assertTrue("missing master key 3", + state.tokenMasterKeyState.contains(key3)); + store.close(); + } +}