From a7fba0bc28764e0fb36c335ea60cc58079fe007f Mon Sep 17 00:00:00 2001 From: Jian He Date: Mon, 1 Dec 2014 16:38:25 -0800 Subject: [PATCH] YARN-2765. Added leveldb-based implementation for RMStateStore. Contributed by Jason Lowe --- hadoop-yarn-project/CHANGES.txt | 2 + .../hadoop/yarn/conf/YarnConfiguration.java | 3 + .../src/main/resources/yarn-default.xml | 8 + .../pom.xml | 4 + .../recovery/LeveldbRMStateStore.java | 691 ++++++++++++++++++ .../recovery/RMStateStoreTestBase.java | 2 + .../recovery/TestLeveldbRMStateStore.java | 139 ++++ 7 files changed, 849 insertions(+) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 755763a462..6a7d55d202 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -51,6 +51,8 @@ Release 2.7.0 - UNRELEASED YARN-2188. [YARN-1492] Client service for cache manager. (Chris Trezzo and Sangjin Lee via kasha) + YARN-2765. Added leveldb-based implementation for RMStateStore. (Jason Lowe + via jianhe) IMPROVEMENTS diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 52bc821f68..41f85ef9e4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -506,6 +506,9 @@ private static void addDeprecatedKeys() { public static final String DEFAULT_FS_RM_STATE_STORE_RETRY_POLICY_SPEC = "2000, 500"; + public static final String RM_LEVELDB_STORE_PATH = RM_PREFIX + + "leveldb-state-store.path"; + /** The maximum number of completed applications RM keeps. */ public static final String RM_MAX_COMPLETED_APPLICATIONS = RM_PREFIX + "max-completed-applications"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 2f549daccd..54c4dbc9a0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -419,6 +419,14 @@ 2000, 500 + + Local path where the RM state will be stored when using + org.apache.hadoop.yarn.server.resourcemanager.recovery.LeveldbRMStateStore + as the value for yarn.resourcemanager.store.class + yarn.resourcemanager.leveldb-state-store.path + ${hadoop.tmp.dir}/yarn/system/rmstore + + Enable RM high-availability. When enabled, (1) The RM starts in the Standby mode by default, and transitions to diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml index 9496517928..9bcc7c879d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml @@ -178,6 +178,10 @@ org.apache.zookeeper zookeeper + + org.fusesource.leveldbjni + leveldbjni-all + org.apache.zookeeper zookeeper diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java new file mode 100644 index 0000000000..38ce37053c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java @@ -0,0 +1,691 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.recovery; + +import static org.fusesource.leveldbjni.JniDBFactory.asString; +import static org.fusesource.leveldbjni.JniDBFactory.bytes; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.IOException; +import java.util.Map.Entry; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.security.token.delegation.DelegationKey; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.AMRMTokenSecretManagerStateProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.EpochProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationAttemptStateDataProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationStateDataProto; +import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; +import org.apache.hadoop.yarn.server.records.Version; +import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.Epoch; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMDelegationTokenIdentifierData; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.AMRMTokenSecretManagerStatePBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; +import org.apache.hadoop.yarn.server.utils.LeveldbIterator; +import org.apache.hadoop.yarn.util.ConverterUtils; +import org.fusesource.leveldbjni.JniDBFactory; +import org.fusesource.leveldbjni.internal.NativeDB; +import org.iq80.leveldb.DB; +import org.iq80.leveldb.DBException; +import org.iq80.leveldb.Logger; +import org.iq80.leveldb.Options; +import org.iq80.leveldb.WriteBatch; + +import com.google.common.annotations.VisibleForTesting; + +public class LeveldbRMStateStore extends RMStateStore { + + public static final Log LOG = + LogFactory.getLog(LeveldbRMStateStore.class); + + private static final String SEPARATOR = "/"; + private static final String DB_NAME = "yarn-rm-state"; + private static final String RM_DT_MASTER_KEY_KEY_PREFIX = + RM_DT_SECRET_MANAGER_ROOT + SEPARATOR + DELEGATION_KEY_PREFIX; + private static final String RM_DT_TOKEN_KEY_PREFIX = + RM_DT_SECRET_MANAGER_ROOT + SEPARATOR + DELEGATION_TOKEN_PREFIX; + private static final String RM_DT_SEQUENCE_NUMBER_KEY = + RM_DT_SECRET_MANAGER_ROOT + SEPARATOR + "RMDTSequentialNumber"; + private static final String RM_APP_KEY_PREFIX = + RM_APP_ROOT + SEPARATOR + ApplicationId.appIdStrPrefix; + + private static final Version CURRENT_VERSION_INFO = Version + .newInstance(1, 0); + + private DB db; + + private String getApplicationNodeKey(ApplicationId appId) { + return RM_APP_ROOT + SEPARATOR + appId; + } + + private String getApplicationAttemptNodeKey(ApplicationAttemptId attemptId) { + return getApplicationAttemptNodeKey( + getApplicationNodeKey(attemptId.getApplicationId()), attemptId); + } + + private String getApplicationAttemptNodeKey(String appNodeKey, + ApplicationAttemptId attemptId) { + return appNodeKey + SEPARATOR + attemptId; + } + + private String getRMDTMasterKeyNodeKey(DelegationKey masterKey) { + return RM_DT_MASTER_KEY_KEY_PREFIX + masterKey.getKeyId(); + } + + private String getRMDTTokenNodeKey(RMDelegationTokenIdentifier tokenId) { + return RM_DT_TOKEN_KEY_PREFIX + tokenId.getSequenceNumber(); + } + + @Override + protected void initInternal(Configuration conf) throws Exception { + } + + private Path getStorageDir() throws IOException { + Configuration conf = getConfig(); + String storePath = conf.get(YarnConfiguration.RM_LEVELDB_STORE_PATH); + if (storePath == null) { + throw new IOException("No store location directory configured in " + + YarnConfiguration.RM_LEVELDB_STORE_PATH); + } + return new Path(storePath, DB_NAME); + } + + private Path createStorageDir() throws IOException { + Path root = getStorageDir(); + FileSystem fs = FileSystem.getLocal(getConfig()); + fs.mkdirs(root, new FsPermission((short)0700)); + return root; + } + + @Override + protected void startInternal() throws Exception { + Path storeRoot = createStorageDir(); + Options options = new Options(); + options.createIfMissing(false); + options.logger(new LeveldbLogger()); + LOG.info("Using state database at " + storeRoot + " for recovery"); + File dbfile = new File(storeRoot.toString()); + try { + db = JniDBFactory.factory.open(dbfile, options); + } catch (NativeDB.DBException e) { + if (e.isNotFound() || e.getMessage().contains(" does not exist ")) { + LOG.info("Creating state database at " + dbfile); + options.createIfMissing(true); + try { + db = JniDBFactory.factory.open(dbfile, options); + // store version + storeVersion(); + } catch (DBException dbErr) { + throw new IOException(dbErr.getMessage(), dbErr); + } + } else { + throw e; + } + } + } + + @Override + protected void closeInternal() throws Exception { + if (db != null) { + db.close(); + db = null; + } + } + + @VisibleForTesting + boolean isClosed() { + return db == null; + } + + @Override + protected Version loadVersion() throws Exception { + Version version = null; + try { + byte[] data = db.get(bytes(VERSION_NODE)); + if (data != null) { + version = new VersionPBImpl(VersionProto.parseFrom(data)); + } + } catch (DBException e) { + throw new IOException(e); + } + return version; + } + + @Override + protected void storeVersion() throws Exception { + dbStoreVersion(CURRENT_VERSION_INFO); + } + + void dbStoreVersion(Version state) throws IOException { + String key = VERSION_NODE; + byte[] data = ((VersionPBImpl) state).getProto().toByteArray(); + try { + db.put(bytes(key), data); + } catch (DBException e) { + throw new IOException(e); + } + } + + @Override + protected Version getCurrentVersion() { + return CURRENT_VERSION_INFO; + } + + @Override + public synchronized long getAndIncrementEpoch() throws Exception { + long currentEpoch = 0; + byte[] dbKeyBytes = bytes(EPOCH_NODE); + try { + byte[] data = db.get(dbKeyBytes); + if (data != null) { + currentEpoch = EpochProto.parseFrom(data).getEpoch(); + } + EpochProto proto = Epoch.newInstance(currentEpoch + 1).getProto(); + db.put(dbKeyBytes, proto.toByteArray()); + } catch (DBException e) { + throw new IOException(e); + } + return currentEpoch; + } + + @Override + public RMState loadState() throws Exception { + RMState rmState = new RMState(); + loadRMDTSecretManagerState(rmState); + loadRMApps(rmState); + loadAMRMTokenSecretManagerState(rmState); + return rmState; + } + + private void loadRMDTSecretManagerState(RMState state) throws IOException { + int numKeys = loadRMDTSecretManagerKeys(state); + LOG.info("Recovered " + numKeys + " RM delegation token master keys"); + int numTokens = loadRMDTSecretManagerTokens(state); + LOG.info("Recovered " + numTokens + " RM delegation tokens"); + loadRMDTSecretManagerTokenSequenceNumber(state); + } + + private int loadRMDTSecretManagerKeys(RMState state) throws IOException { + int numKeys = 0; + LeveldbIterator iter = null; + try { + iter = new LeveldbIterator(db); + iter.seek(bytes(RM_DT_MASTER_KEY_KEY_PREFIX)); + while (iter.hasNext()) { + Entry entry = iter.next(); + String key = asString(entry.getKey()); + if (!key.startsWith(RM_DT_MASTER_KEY_KEY_PREFIX)) { + break; + } + DelegationKey masterKey = loadDelegationKey(entry.getValue()); + state.rmSecretManagerState.masterKeyState.add(masterKey); + ++numKeys; + if (LOG.isDebugEnabled()) { + LOG.debug("Loaded RM delegation key from " + key + + ": keyId=" + masterKey.getKeyId() + + ", expirationDate=" + masterKey.getExpiryDate()); + } + } + } catch (DBException e) { + throw new IOException(e); + } finally { + if (iter != null) { + iter.close(); + } + } + return numKeys; + } + + private DelegationKey loadDelegationKey(byte[] data) throws IOException { + DelegationKey key = new DelegationKey(); + DataInputStream in = new DataInputStream(new ByteArrayInputStream(data)); + try { + key.readFields(in); + } finally { + IOUtils.cleanup(LOG, in); + } + return key; + } + + private int loadRMDTSecretManagerTokens(RMState state) throws IOException { + int numTokens = 0; + LeveldbIterator iter = null; + try { + iter = new LeveldbIterator(db); + iter.seek(bytes(RM_DT_TOKEN_KEY_PREFIX)); + while (iter.hasNext()) { + Entry entry = iter.next(); + String key = asString(entry.getKey()); + if (!key.startsWith(RM_DT_TOKEN_KEY_PREFIX)) { + break; + } + RMDelegationTokenIdentifierData tokenData = loadDelegationToken( + entry.getValue()); + RMDelegationTokenIdentifier tokenId = tokenData.getTokenIdentifier(); + long renewDate = tokenData.getRenewDate(); + state.rmSecretManagerState.delegationTokenState.put(tokenId, + renewDate); + ++numTokens; + if (LOG.isDebugEnabled()) { + LOG.debug("Loaded RM delegation token from " + key + + ": tokenId=" + tokenId + ", renewDate=" + renewDate); + } + } + } catch (DBException e) { + throw new IOException(e); + } finally { + if (iter != null) { + iter.close(); + } + } + return numTokens; + } + + private RMDelegationTokenIdentifierData loadDelegationToken(byte[] data) + throws IOException { + RMDelegationTokenIdentifierData tokenData = + new RMDelegationTokenIdentifierData(); + DataInputStream in = new DataInputStream(new ByteArrayInputStream(data)); + try { + tokenData.readFields(in); + } finally { + IOUtils.cleanup(LOG, in); + } + return tokenData; + } + + private void loadRMDTSecretManagerTokenSequenceNumber(RMState state) + throws IOException { + byte[] data = null; + try { + data = db.get(bytes(RM_DT_SEQUENCE_NUMBER_KEY)); + } catch (DBException e) { + throw new IOException(e); + } + if (data != null) { + DataInputStream in = new DataInputStream(new ByteArrayInputStream(data)); + try { + state.rmSecretManagerState.dtSequenceNumber = in.readInt(); + } finally { + IOUtils.cleanup(LOG, in); + } + } + } + + private void loadRMApps(RMState state) throws IOException { + int numApps = 0; + int numAppAttempts = 0; + LeveldbIterator iter = null; + try { + iter = new LeveldbIterator(db); + iter.seek(bytes(RM_APP_KEY_PREFIX)); + while (iter.hasNext()) { + Entry entry = iter.next(); + String key = asString(entry.getKey()); + if (!key.startsWith(RM_APP_KEY_PREFIX)) { + break; + } + + String appIdStr = key.substring(RM_APP_ROOT.length() + 1); + if (appIdStr.contains(SEPARATOR)) { + LOG.warn("Skipping extraneous data " + key); + continue; + } + + numAppAttempts += loadRMApp(state, iter, appIdStr, entry.getValue()); + ++numApps; + } + } catch (DBException e) { + throw new IOException(e); + } finally { + if (iter != null) { + iter.close(); + } + } + LOG.info("Recovered " + numApps + " applications and " + numAppAttempts + + " application attempts"); + } + + private int loadRMApp(RMState rmState, LeveldbIterator iter, String appIdStr, + byte[] appData) throws IOException { + ApplicationStateData appState = createApplicationState(appIdStr, appData); + ApplicationId appId = + appState.getApplicationSubmissionContext().getApplicationId(); + rmState.appState.put(appId, appState); + String attemptNodePrefix = getApplicationNodeKey(appId) + SEPARATOR; + while (iter.hasNext()) { + Entry entry = iter.peekNext(); + String key = asString(entry.getKey()); + if (!key.startsWith(attemptNodePrefix)) { + break; + } + + String attemptId = key.substring(attemptNodePrefix.length()); + if (attemptId.startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) { + ApplicationAttemptStateData attemptState = + createAttemptState(attemptId, entry.getValue()); + appState.attempts.put(attemptState.getAttemptId(), attemptState); + } else { + LOG.warn("Ignoring unknown application key: " + key); + } + iter.next(); + } + int numAttempts = appState.attempts.size(); + if (LOG.isDebugEnabled()) { + LOG.debug("Loaded application " + appId + " with " + numAttempts + + " attempts"); + } + return numAttempts; + } + + private ApplicationStateData createApplicationState(String appIdStr, + byte[] data) throws IOException { + ApplicationId appId = ConverterUtils.toApplicationId(appIdStr); + ApplicationStateDataPBImpl appState = + new ApplicationStateDataPBImpl( + ApplicationStateDataProto.parseFrom(data)); + if (!appId.equals( + appState.getApplicationSubmissionContext().getApplicationId())) { + throw new YarnRuntimeException("The database entry for " + appId + + " contains data for " + + appState.getApplicationSubmissionContext().getApplicationId()); + } + return appState; + } + + @VisibleForTesting + ApplicationStateData loadRMAppState(ApplicationId appId) throws IOException { + String appKey = getApplicationNodeKey(appId); + byte[] data = null; + try { + data = db.get(bytes(appKey)); + } catch (DBException e) { + throw new IOException(e); + } + if (data == null) { + return null; + } + return createApplicationState(appId.toString(), data); + } + + private ApplicationAttemptStateData createAttemptState(String itemName, + byte[] data) throws IOException { + ApplicationAttemptId attemptId = + ConverterUtils.toApplicationAttemptId(itemName); + ApplicationAttemptStateDataPBImpl attemptState = + new ApplicationAttemptStateDataPBImpl( + ApplicationAttemptStateDataProto.parseFrom(data)); + if (!attemptId.equals(attemptState.getAttemptId())) { + throw new YarnRuntimeException("The database entry for " + attemptId + + " contains data for " + attemptState.getAttemptId()); + } + return attemptState; + } + + private void loadAMRMTokenSecretManagerState(RMState rmState) + throws IOException { + try { + byte[] data = db.get(bytes(AMRMTOKEN_SECRET_MANAGER_ROOT)); + if (data != null) { + AMRMTokenSecretManagerStatePBImpl stateData = + new AMRMTokenSecretManagerStatePBImpl( + AMRMTokenSecretManagerStateProto.parseFrom(data)); + rmState.amrmTokenSecretManagerState = + AMRMTokenSecretManagerState.newInstance( + stateData.getCurrentMasterKey(), + stateData.getNextMasterKey()); + } + } catch (DBException e) { + throw new IOException(e); + } + } + + @Override + protected void storeApplicationStateInternal(ApplicationId appId, + ApplicationStateData appStateData) throws IOException { + String key = getApplicationNodeKey(appId); + if (LOG.isDebugEnabled()) { + LOG.debug("Storing state for app " + appId + " at " + key); + } + try { + db.put(bytes(key), appStateData.getProto().toByteArray()); + } catch (DBException e) { + throw new IOException(e); + } + } + + @Override + protected void updateApplicationStateInternal(ApplicationId appId, + ApplicationStateData appStateData) throws IOException { + storeApplicationStateInternal(appId, appStateData); + } + + @Override + protected void storeApplicationAttemptStateInternal( + ApplicationAttemptId attemptId, + ApplicationAttemptStateData attemptStateData) throws IOException { + String key = getApplicationAttemptNodeKey(attemptId); + if (LOG.isDebugEnabled()) { + LOG.debug("Storing state for attempt " + attemptId + " at " + key); + } + try { + db.put(bytes(key), attemptStateData.getProto().toByteArray()); + } catch (DBException e) { + throw new IOException(e); + } + } + + @Override + protected void updateApplicationAttemptStateInternal( + ApplicationAttemptId attemptId, + ApplicationAttemptStateData attemptStateData) throws IOException { + storeApplicationAttemptStateInternal(attemptId, attemptStateData); + } + + @Override + protected void removeApplicationStateInternal(ApplicationStateData appState) + throws IOException { + ApplicationId appId = + appState.getApplicationSubmissionContext().getApplicationId(); + String appKey = getApplicationNodeKey(appId); + try { + WriteBatch batch = db.createWriteBatch(); + try { + batch.delete(bytes(appKey)); + for (ApplicationAttemptId attemptId : appState.attempts.keySet()) { + String attemptKey = getApplicationAttemptNodeKey(appKey, attemptId); + batch.delete(bytes(attemptKey)); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Removing state for app " + appId + " and " + + appState.attempts.size() + " attempts" + " at " + appKey); + } + db.write(batch); + } finally { + batch.close(); + } + } catch (DBException e) { + throw new IOException(e); + } + } + + @Override + protected void storeRMDelegationTokenAndSequenceNumberState( + RMDelegationTokenIdentifier tokenId, Long renewDate, + int latestSequenceNumber) throws IOException { + String tokenKey = getRMDTTokenNodeKey(tokenId); + RMDelegationTokenIdentifierData tokenData = + new RMDelegationTokenIdentifierData(tokenId, renewDate); + ByteArrayOutputStream bs = new ByteArrayOutputStream(); + DataOutputStream ds = new DataOutputStream(bs); + try { + ds.writeInt(latestSequenceNumber); + } finally { + ds.close(); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Storing token to " + tokenKey); + LOG.debug("Storing " + latestSequenceNumber + " to " + + RM_DT_SEQUENCE_NUMBER_KEY); + } + try { + WriteBatch batch = db.createWriteBatch(); + try { + batch.put(bytes(tokenKey), tokenData.toByteArray()); + batch.put(bytes(RM_DT_SEQUENCE_NUMBER_KEY), bs.toByteArray()); + db.write(batch); + } finally { + batch.close(); + } + } catch (DBException e) { + throw new IOException(e); + } + } + + @Override + protected void updateRMDelegationTokenAndSequenceNumberInternal( + RMDelegationTokenIdentifier tokenId, Long renewDate, + int latestSequenceNumber) throws IOException { + storeRMDelegationTokenAndSequenceNumberState(tokenId, renewDate, + latestSequenceNumber); + } + + @Override + protected void removeRMDelegationTokenState( + RMDelegationTokenIdentifier tokenId) throws IOException { + String tokenKey = getRMDTTokenNodeKey(tokenId); + if (LOG.isDebugEnabled()) { + LOG.debug("Removing token at " + tokenKey); + } + try { + db.delete(bytes(tokenKey)); + } catch (DBException e) { + throw new IOException(e); + } + } + + @Override + protected void storeRMDTMasterKeyState(DelegationKey masterKey) + throws IOException { + String dbKey = getRMDTMasterKeyNodeKey(masterKey); + if (LOG.isDebugEnabled()) { + LOG.debug("Storing token master key to " + dbKey); + } + ByteArrayOutputStream os = new ByteArrayOutputStream(); + DataOutputStream out = new DataOutputStream(os); + try { + masterKey.write(out); + } finally { + out.close(); + } + try { + db.put(bytes(dbKey), os.toByteArray()); + } catch (DBException e) { + throw new IOException(e); + } + } + + @Override + protected void removeRMDTMasterKeyState(DelegationKey masterKey) + throws IOException { + String dbKey = getRMDTMasterKeyNodeKey(masterKey); + if (LOG.isDebugEnabled()) { + LOG.debug("Removing token master key at " + dbKey); + } + try { + db.delete(bytes(dbKey)); + } catch (DBException e) { + throw new IOException(e); + } + } + + @Override + public void storeOrUpdateAMRMTokenSecretManagerState( + AMRMTokenSecretManagerState state, boolean isUpdate) { + AMRMTokenSecretManagerState data = + AMRMTokenSecretManagerState.newInstance(state); + byte[] stateData = data.getProto().toByteArray(); + try { + db.put(bytes(AMRMTOKEN_SECRET_MANAGER_ROOT), stateData); + } catch (DBException e) { + notifyStoreOperationFailed(e); + } + } + + @Override + public void deleteStore() throws IOException { + Path root = getStorageDir(); + LOG.info("Deleting state database at " + root); + db.close(); + db = null; + FileSystem fs = FileSystem.getLocal(getConfig()); + fs.delete(root, true); + } + + @VisibleForTesting + int getNumEntriesInDatabase() throws IOException { + int numEntries = 0; + LeveldbIterator iter = null; + try { + iter = new LeveldbIterator(db); + iter.seekToFirst(); + while (iter.hasNext()) { + Entry entry = iter.next(); + LOG.info("entry: " + asString(entry.getKey())); + ++numEntries; + } + } catch (DBException e) { + throw new IOException(e); + } finally { + if (iter != null) { + iter.close(); + } + } + return numEntries; + } + + private static class LeveldbLogger implements Logger { + private static final Log LOG = LogFactory.getLog(LeveldbLogger.class); + + @Override + public void log(String message) { + LOG.info(message); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java index 8d6a7b6938..3d07b377f2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java @@ -621,6 +621,7 @@ public void testAMRMTokenSecretManagerStateStore( // load state store = stateStoreHelper.getRMStateStore(); + when(rmContext.getStateStore()).thenReturn(store); store.setRMDispatcher(dispatcher); RMState state = store.loadState(); Assert.assertNotNull(state.getAMRMTokenSecretManagerState()); @@ -640,6 +641,7 @@ public void testAMRMTokenSecretManagerStateStore( // load state store = stateStoreHelper.getRMStateStore(); + when(rmContext.getStateStore()).thenReturn(store); store.setRMDispatcher(dispatcher); RMState state_2 = store.loadState(); Assert.assertNotNull(state_2.getAMRMTokenSecretManagerState()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java new file mode 100644 index 0000000000..ae885d2df4 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.recovery; + +import java.io.File; +import java.io.IOException; + +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.records.Version; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TestLeveldbRMStateStore extends RMStateStoreTestBase { + + private static final File TEST_DIR = new File( + System.getProperty("test.build.data", + System.getProperty("java.io.tmpdir")), + TestLeveldbRMStateStore.class.getName()); + + private YarnConfiguration conf; + private LeveldbRMStateStore stateStore = null; + + @Before + public void setup() throws IOException { + FileUtil.fullyDelete(TEST_DIR); + conf = new YarnConfiguration(); + conf.set(YarnConfiguration.RM_LEVELDB_STORE_PATH, TEST_DIR.toString()); + } + + @After + public void cleanup() throws IOException { + if (stateStore != null) { + stateStore.close(); + } + FileUtil.fullyDelete(TEST_DIR); + } + + @Test(timeout = 60000) + public void testApps() throws Exception { + LeveldbStateStoreTester tester = new LeveldbStateStoreTester(); + testRMAppStateStore(tester); + } + + @Test(timeout = 60000) + public void testClientTokens() throws Exception { + LeveldbStateStoreTester tester = new LeveldbStateStoreTester(); + testRMDTSecretManagerStateStore(tester); + } + + @Test(timeout = 60000) + public void testVersion() throws Exception { + LeveldbStateStoreTester tester = new LeveldbStateStoreTester(); + testCheckVersion(tester); + } + + @Test(timeout = 60000) + public void testEpoch() throws Exception { + LeveldbStateStoreTester tester = new LeveldbStateStoreTester(); + testEpoch(tester); + } + + @Test(timeout = 60000) + public void testAppDeletion() throws Exception { + LeveldbStateStoreTester tester = new LeveldbStateStoreTester(); + testAppDeletion(tester); + } + + @Test(timeout = 60000) + public void testDeleteStore() throws Exception { + LeveldbStateStoreTester tester = new LeveldbStateStoreTester(); + testDeleteStore(tester); + } + + @Test(timeout = 60000) + public void testAMTokens() throws Exception { + LeveldbStateStoreTester tester = new LeveldbStateStoreTester(); + testAMRMTokenSecretManagerStateStore(tester); + } + + class LeveldbStateStoreTester implements RMStateStoreHelper { + + @Override + public RMStateStore getRMStateStore() throws Exception { + if (stateStore != null) { + stateStore.close(); + } + stateStore = new LeveldbRMStateStore(); + stateStore.init(conf); + stateStore.start(); + return stateStore; + } + + @Override + public boolean isFinalStateValid() throws Exception { + // There should be 6 total entries: + // 1 entry for version + // 2 entries for app 0010 with one attempt + // 3 entries for app 0001 with two attempts + return stateStore.getNumEntriesInDatabase() == 6; + } + + @Override + public void writeVersion(Version version) throws Exception { + stateStore.dbStoreVersion(version); + } + + @Override + public Version getCurrentVersion() throws Exception { + return stateStore.getCurrentVersion(); + } + + @Override + public boolean appExists(RMApp app) throws Exception { + if (stateStore.isClosed()) { + getRMStateStore(); + } + return stateStore.loadRMAppState(app.getApplicationId()) != null; + } + } +}