diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index b28fc65c91..35ceb2e51f 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -262,6 +262,8 @@ Release 2.7.0 - UNRELEASED
cache with enabling wired encryption at the same time.
(Junping Du via xgong)
+ MAPREDUCE-6141. History server leveldb recovery store (jlowe)
+
OPTIMIZATIONS
MAPREDUCE-6169. MergeQueue should release reference to the current item
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java
index e5a49b5e1b..f7cba9f806 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java
@@ -197,6 +197,13 @@ public class JHAdminConfig {
public static final String MR_HS_FS_STATE_STORE_URI =
MR_HISTORY_PREFIX + "recovery.store.fs.uri";
+ /**
+ * The local path where server state will be stored when
+ * HistoryServerLeveldbStateStoreService is configured as the state store
+ */
+ public static final String MR_HS_LEVELDB_STATE_STORE_PATH =
+ MR_HISTORY_PREFIX + "recovery.store.leveldb.path";
+
/** Whether to use fixed ports with the minicluster. */
public static final String MR_HISTORY_MINICLUSTER_FIXED_PORTS = MR_HISTORY_PREFIX
+ "minicluster.fixed.ports";
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
index 57a17a80c7..4535137617 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
@@ -1568,6 +1568,14 @@
storage class.
+
+ mapreduce.jobhistory.recovery.store.leveldb.path
+ ${hadoop.tmp.dir}/mapred/history/recoverystore
+ The URI where history server state will be stored if
+ HistoryServerLeveldbSystemStateStoreService is configured as the recovery
+ storage class.
+
+
mapreduce.jobhistory.http.policy
HTTP_ONLY
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/pom.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/pom.xml
index adeb9fa621..fa8162b7c4 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/pom.xml
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/pom.xml
@@ -63,6 +63,10 @@
test-jar
test
+
+ org.fusesource.leveldbjni
+ leveldbjni-all
+
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerLeveldbStateStoreService.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerLeveldbStateStoreService.java
new file mode 100644
index 0000000000..16366b1ff8
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerLeveldbStateStoreService.java
@@ -0,0 +1,379 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.hs;
+
+import static org.fusesource.leveldbjni.JniDBFactory.asString;
+import static org.fusesource.leveldbjni.JniDBFactory.bytes;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.mapreduce.v2.api.MRDelegationTokenIdentifier;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
+import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
+import org.apache.hadoop.yarn.server.records.Version;
+import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
+import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
+import org.fusesource.leveldbjni.JniDBFactory;
+import org.fusesource.leveldbjni.internal.NativeDB;
+import org.iq80.leveldb.DB;
+import org.iq80.leveldb.DBException;
+import org.iq80.leveldb.Logger;
+import org.iq80.leveldb.Options;
+
+public class HistoryServerLeveldbStateStoreService extends
+ HistoryServerStateStoreService {
+
+ private static final String DB_NAME = "mr-jhs-state";
+ private static final String DB_SCHEMA_VERSION_KEY = "jhs-schema-version";
+ private static final String TOKEN_MASTER_KEY_KEY_PREFIX = "tokens/key_";
+ private static final String TOKEN_STATE_KEY_PREFIX = "tokens/token_";
+
+ private static final Version CURRENT_VERSION_INFO =
+ Version.newInstance(1, 0);
+
+ private DB db;
+
+ public static final Log LOG =
+ LogFactory.getLog(HistoryServerLeveldbStateStoreService.class);
+
+ @Override
+ protected void initStorage(Configuration conf) throws IOException {
+ }
+
+ @Override
+ protected void startStorage() throws IOException {
+ Path storeRoot = createStorageDir(getConfig());
+ Options options = new Options();
+ options.createIfMissing(false);
+ options.logger(new LeveldbLogger());
+ LOG.info("Using state database at " + storeRoot + " for recovery");
+ File dbfile = new File(storeRoot.toString());
+ try {
+ db = JniDBFactory.factory.open(dbfile, options);
+ } catch (NativeDB.DBException e) {
+ if (e.isNotFound() || e.getMessage().contains(" does not exist ")) {
+ LOG.info("Creating state database at " + dbfile);
+ options.createIfMissing(true);
+ try {
+ db = JniDBFactory.factory.open(dbfile, options);
+ // store version
+ storeVersion();
+ } catch (DBException dbErr) {
+ throw new IOException(dbErr.getMessage(), dbErr);
+ }
+ } else {
+ throw e;
+ }
+ }
+ checkVersion();
+ }
+
+ @Override
+ protected void closeStorage() throws IOException {
+ if (db != null) {
+ db.close();
+ db = null;
+ }
+ }
+
+ @Override
+ public HistoryServerState loadState() throws IOException {
+ HistoryServerState state = new HistoryServerState();
+ int numKeys = loadTokenMasterKeys(state);
+ LOG.info("Recovered " + numKeys + " token master keys");
+ int numTokens = loadTokens(state);
+ LOG.info("Recovered " + numTokens + " tokens");
+ return state;
+ }
+
+ private int loadTokenMasterKeys(HistoryServerState state)
+ throws IOException {
+ int numKeys = 0;
+ LeveldbIterator iter = null;
+ try {
+ iter = new LeveldbIterator(db);
+ iter.seek(bytes(TOKEN_MASTER_KEY_KEY_PREFIX));
+ while (iter.hasNext()) {
+ Entry entry = iter.next();
+ String key = asString(entry.getKey());
+ if (!key.startsWith(TOKEN_MASTER_KEY_KEY_PREFIX)) {
+ break;
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Loading master key from " + key);
+ }
+ try {
+ loadTokenMasterKey(state, entry.getValue());
+ } catch (IOException e) {
+ throw new IOException("Error loading token master key from " + key,
+ e);
+ }
+ ++numKeys;
+ }
+ } catch (DBException e) {
+ throw new IOException(e);
+ } finally {
+ if (iter != null) {
+ iter.close();
+ }
+ }
+ return numKeys;
+ }
+
+ private void loadTokenMasterKey(HistoryServerState state, byte[] data)
+ throws IOException {
+ DelegationKey key = new DelegationKey();
+ DataInputStream in =
+ new DataInputStream(new ByteArrayInputStream(data));
+ try {
+ key.readFields(in);
+ } finally {
+ IOUtils.cleanup(LOG, in);
+ }
+ state.tokenMasterKeyState.add(key);
+ }
+
+ private int loadTokens(HistoryServerState state) throws IOException {
+ int numTokens = 0;
+ LeveldbIterator iter = null;
+ try {
+ iter = new LeveldbIterator(db);
+ iter.seek(bytes(TOKEN_STATE_KEY_PREFIX));
+ while (iter.hasNext()) {
+ Entry entry = iter.next();
+ String key = asString(entry.getKey());
+ if (!key.startsWith(TOKEN_STATE_KEY_PREFIX)) {
+ break;
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Loading token from " + key);
+ }
+ try {
+ loadToken(state, entry.getValue());
+ } catch (IOException e) {
+ throw new IOException("Error loading token state from " + key, e);
+ }
+ ++numTokens;
+ }
+ } catch (DBException e) {
+ throw new IOException(e);
+ } finally {
+ if (iter != null) {
+ iter.close();
+ }
+ }
+ return numTokens;
+ }
+
+ private void loadToken(HistoryServerState state, byte[] data)
+ throws IOException {
+ MRDelegationTokenIdentifier tokenId = new MRDelegationTokenIdentifier();
+ long renewDate;
+ DataInputStream in = new DataInputStream(new ByteArrayInputStream(data));
+ try {
+ tokenId.readFields(in);
+ renewDate = in.readLong();
+ } finally {
+ IOUtils.cleanup(LOG, in);
+ }
+ state.tokenState.put(tokenId, renewDate);
+ }
+
+ @Override
+ public void storeToken(MRDelegationTokenIdentifier tokenId, Long renewDate)
+ throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Storing token " + tokenId.getSequenceNumber());
+ }
+
+ ByteArrayOutputStream memStream = new ByteArrayOutputStream();
+ DataOutputStream dataStream = new DataOutputStream(memStream);
+ try {
+ tokenId.write(dataStream);
+ dataStream.writeLong(renewDate);
+ dataStream.close();
+ dataStream = null;
+ } finally {
+ IOUtils.cleanup(LOG, dataStream);
+ }
+
+ String dbKey = getTokenDatabaseKey(tokenId);
+ try {
+ db.put(bytes(dbKey), memStream.toByteArray());
+ } catch (DBException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void updateToken(MRDelegationTokenIdentifier tokenId, Long renewDate)
+ throws IOException {
+ storeToken(tokenId, renewDate);
+ }
+
+ @Override
+ public void removeToken(MRDelegationTokenIdentifier tokenId)
+ throws IOException {
+ String dbKey = getTokenDatabaseKey(tokenId);
+ try {
+ db.delete(bytes(dbKey));
+ } catch (DBException e) {
+ throw new IOException(e);
+ }
+ }
+
+ private String getTokenDatabaseKey(MRDelegationTokenIdentifier tokenId) {
+ return TOKEN_STATE_KEY_PREFIX + tokenId.getSequenceNumber();
+ }
+
+ @Override
+ public void storeTokenMasterKey(DelegationKey masterKey)
+ throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Storing master key " + masterKey.getKeyId());
+ }
+
+ ByteArrayOutputStream memStream = new ByteArrayOutputStream();
+ DataOutputStream dataStream = new DataOutputStream(memStream);
+ try {
+ masterKey.write(dataStream);
+ dataStream.close();
+ dataStream = null;
+ } finally {
+ IOUtils.cleanup(LOG, dataStream);
+ }
+
+ String dbKey = getTokenMasterKeyDatabaseKey(masterKey);
+ try {
+ db.put(bytes(dbKey), memStream.toByteArray());
+ } catch (DBException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void removeTokenMasterKey(DelegationKey masterKey)
+ throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Removing master key " + masterKey.getKeyId());
+ }
+
+ String dbKey = getTokenMasterKeyDatabaseKey(masterKey);
+ try {
+ db.delete(bytes(dbKey));
+ } catch (DBException e) {
+ throw new IOException(e);
+ }
+ }
+
+ private String getTokenMasterKeyDatabaseKey(DelegationKey masterKey) {
+ return TOKEN_MASTER_KEY_KEY_PREFIX + masterKey.getKeyId();
+ }
+
+ private Path createStorageDir(Configuration conf) throws IOException {
+ String confPath = conf.get(JHAdminConfig.MR_HS_LEVELDB_STATE_STORE_PATH);
+ if (confPath == null) {
+ throw new IOException("No store location directory configured in " +
+ JHAdminConfig.MR_HS_LEVELDB_STATE_STORE_PATH);
+ }
+ Path root = new Path(confPath, DB_NAME);
+ FileSystem fs = FileSystem.getLocal(conf);
+ fs.mkdirs(root, new FsPermission((short)0700));
+ return root;
+ }
+
+ Version loadVersion() throws IOException {
+ byte[] data = db.get(bytes(DB_SCHEMA_VERSION_KEY));
+ // if version is not stored previously, treat it as 1.0.
+ if (data == null || data.length == 0) {
+ return Version.newInstance(1, 0);
+ }
+ Version version =
+ new VersionPBImpl(VersionProto.parseFrom(data));
+ return version;
+ }
+
+ private void storeVersion() throws IOException {
+ dbStoreVersion(CURRENT_VERSION_INFO);
+ }
+
+ void dbStoreVersion(Version state) throws IOException {
+ String key = DB_SCHEMA_VERSION_KEY;
+ byte[] data =
+ ((VersionPBImpl) state).getProto().toByteArray();
+ try {
+ db.put(bytes(key), data);
+ } catch (DBException e) {
+ throw new IOException(e);
+ }
+ }
+
+ Version getCurrentVersion() {
+ return CURRENT_VERSION_INFO;
+ }
+
+ /**
+ * 1) Versioning scheme: major.minor. For e.g. 1.0, 1.1, 1.2...1.25, 2.0 etc.
+ * 2) Any incompatible change of state-store is a major upgrade, and any
+ * compatible change of state-store is a minor upgrade.
+ * 3) Within a minor upgrade, say 1.1 to 1.2:
+ * overwrite the version info and proceed as normal.
+ * 4) Within a major upgrade, say 1.2 to 2.0:
+ * throw exception and indicate user to use a separate upgrade tool to
+ * upgrade state or remove incompatible old state.
+ */
+ private void checkVersion() throws IOException {
+ Version loadedVersion = loadVersion();
+ LOG.info("Loaded state version info " + loadedVersion);
+ if (loadedVersion.equals(getCurrentVersion())) {
+ return;
+ }
+ if (loadedVersion.isCompatibleTo(getCurrentVersion())) {
+ LOG.info("Storing state version info " + getCurrentVersion());
+ storeVersion();
+ } else {
+ throw new IOException(
+ "Incompatible version for state: expecting state version "
+ + getCurrentVersion() + ", but loading version " + loadedVersion);
+ }
+ }
+
+ private static class LeveldbLogger implements Logger {
+ private static final Log LOG = LogFactory.getLog(LeveldbLogger.class);
+
+ @Override
+ public void log(String message) {
+ LOG.info(message);
+ }
+ }
+}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryServerLeveldbStateStoreService.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryServerLeveldbStateStoreService.java
new file mode 100644
index 0000000000..2af2f84302
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryServerLeveldbStateStoreService.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.hs;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.v2.api.MRDelegationTokenIdentifier;
+import org.apache.hadoop.mapreduce.v2.hs.HistoryServerStateStoreService.HistoryServerState;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
+import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.service.ServiceStateException;
+import org.apache.hadoop.yarn.server.records.Version;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestHistoryServerLeveldbStateStoreService {
+
+ private static final File testDir = new File(
+ System.getProperty("test.build.data",
+ System.getProperty("java.io.tmpdir")),
+ "TestHistoryServerLeveldbSystemStateStoreService");
+
+ private Configuration conf;
+
+ @Before
+ public void setup() {
+ FileUtil.fullyDelete(testDir);
+ testDir.mkdirs();
+ conf = new Configuration();
+ conf.setBoolean(JHAdminConfig.MR_HS_RECOVERY_ENABLE, true);
+ conf.setClass(JHAdminConfig.MR_HS_STATE_STORE,
+ HistoryServerLeveldbStateStoreService.class,
+ HistoryServerStateStoreService.class);
+ conf.set(JHAdminConfig.MR_HS_LEVELDB_STATE_STORE_PATH,
+ testDir.getAbsoluteFile().toString());
+ }
+
+ @After
+ public void cleanup() {
+ FileUtil.fullyDelete(testDir);
+ }
+
+ private HistoryServerStateStoreService createAndStartStore()
+ throws IOException {
+ HistoryServerStateStoreService store =
+ HistoryServerStateStoreServiceFactory.getStore(conf);
+ assertTrue("Factory did not create a leveldb store",
+ store instanceof HistoryServerLeveldbStateStoreService);
+ store.init(conf);
+ store.start();
+ return store;
+ }
+
+ @Test
+ public void testCheckVersion() throws IOException {
+ HistoryServerLeveldbStateStoreService store =
+ new HistoryServerLeveldbStateStoreService();
+ store.init(conf);
+ store.start();
+
+ // default version
+ Version defaultVersion = store.getCurrentVersion();
+ assertEquals(defaultVersion, store.loadVersion());
+
+ // compatible version
+ Version compatibleVersion =
+ Version.newInstance(defaultVersion.getMajorVersion(),
+ defaultVersion.getMinorVersion() + 2);
+ store.dbStoreVersion(compatibleVersion);
+ assertEquals(compatibleVersion, store.loadVersion());
+ store.close();
+ store = new HistoryServerLeveldbStateStoreService();
+ store.init(conf);
+ store.start();
+
+ // overwrite the compatible version
+ assertEquals(defaultVersion, store.loadVersion());
+
+ // incompatible version
+ Version incompatibleVersion =
+ Version.newInstance(defaultVersion.getMajorVersion() + 1,
+ defaultVersion.getMinorVersion());
+ store.dbStoreVersion(incompatibleVersion);
+ store.close();
+ store = new HistoryServerLeveldbStateStoreService();
+ try {
+ store.init(conf);
+ store.start();
+ fail("Incompatible version, should have thrown before here.");
+ } catch (ServiceStateException e) {
+ assertTrue("Exception message mismatch",
+ e.getMessage().contains("Incompatible version for state:"));
+ }
+ store.close();
+ }
+
+ @Test
+ public void testTokenStore() throws IOException {
+ HistoryServerStateStoreService store = createAndStartStore();
+
+ // verify initially the store is empty
+ HistoryServerState state = store.loadState();
+ assertTrue("token state not empty", state.tokenState.isEmpty());
+ assertTrue("key state not empty", state.tokenMasterKeyState.isEmpty());
+
+ // store a key and some tokens
+ final DelegationKey key1 = new DelegationKey(1, 2, "keyData1".getBytes());
+ final MRDelegationTokenIdentifier token1 =
+ new MRDelegationTokenIdentifier(new Text("tokenOwner1"),
+ new Text("tokenRenewer1"), new Text("tokenUser1"));
+ token1.setSequenceNumber(1);
+ final Long tokenDate1 = 1L;
+ final MRDelegationTokenIdentifier token2 =
+ new MRDelegationTokenIdentifier(new Text("tokenOwner2"),
+ new Text("tokenRenewer2"), new Text("tokenUser2"));
+ token2.setSequenceNumber(12345678);
+ final Long tokenDate2 = 87654321L;
+
+ store.storeTokenMasterKey(key1);
+ store.storeToken(token1, tokenDate1);
+ store.storeToken(token2, tokenDate2);
+ store.close();
+
+ // verify the key and tokens can be recovered
+ store = createAndStartStore();
+ state = store.loadState();
+ assertEquals("incorrect loaded token count", 2, state.tokenState.size());
+ assertTrue("missing token 1", state.tokenState.containsKey(token1));
+ assertEquals("incorrect token 1 date", tokenDate1,
+ state.tokenState.get(token1));
+ assertTrue("missing token 2", state.tokenState.containsKey(token2));
+ assertEquals("incorrect token 2 date", tokenDate2,
+ state.tokenState.get(token2));
+ assertEquals("incorrect master key count", 1,
+ state.tokenMasterKeyState.size());
+ assertTrue("missing master key 1",
+ state.tokenMasterKeyState.contains(key1));
+
+ // store some more keys and tokens, remove the previous key and one
+ // of the tokens, and renew a previous token
+ final DelegationKey key2 = new DelegationKey(3, 4, "keyData2".getBytes());
+ final DelegationKey key3 = new DelegationKey(5, 6, "keyData3".getBytes());
+ final MRDelegationTokenIdentifier token3 =
+ new MRDelegationTokenIdentifier(new Text("tokenOwner3"),
+ new Text("tokenRenewer3"), new Text("tokenUser3"));
+ token3.setSequenceNumber(12345679);
+ final Long tokenDate3 = 87654321L;
+
+ store.removeToken(token1);
+ store.storeTokenMasterKey(key2);
+ final Long newTokenDate2 = 975318642L;
+ store.updateToken(token2, newTokenDate2);
+ store.removeTokenMasterKey(key1);
+ store.storeTokenMasterKey(key3);
+ store.storeToken(token3, tokenDate3);
+ store.close();
+
+ // verify the new keys and tokens are recovered, the removed key and
+ // token are no longer present, and the renewed token has the updated
+ // expiration date
+ store = createAndStartStore();
+ state = store.loadState();
+ assertEquals("incorrect loaded token count", 2, state.tokenState.size());
+ assertFalse("token 1 not removed", state.tokenState.containsKey(token1));
+ assertTrue("missing token 2", state.tokenState.containsKey(token2));
+ assertEquals("incorrect token 2 date", newTokenDate2,
+ state.tokenState.get(token2));
+ assertTrue("missing token 3", state.tokenState.containsKey(token3));
+ assertEquals("incorrect token 3 date", tokenDate3,
+ state.tokenState.get(token3));
+ assertEquals("incorrect master key count", 2,
+ state.tokenMasterKeyState.size());
+ assertFalse("master key 1 not removed",
+ state.tokenMasterKeyState.contains(key1));
+ assertTrue("missing master key 2",
+ state.tokenMasterKeyState.contains(key2));
+ assertTrue("missing master key 3",
+ state.tokenMasterKeyState.contains(key3));
+ store.close();
+ }
+}