MAPREDUCE-6141. History server leveldb recovery store. Contributed by Jason Lowe

This commit is contained in:
Jason Lowe 2015-01-26 16:28:55 +00:00
parent 902c6ea7e4
commit 56b7ec71a6
6 changed files with 607 additions and 0 deletions

View File

@ -262,6 +262,8 @@ Release 2.7.0 - UNRELEASED
cache with enabling wired encryption at the same time.
(Junping Du via xgong)
MAPREDUCE-6141. History server leveldb recovery store (jlowe)
OPTIMIZATIONS
MAPREDUCE-6169. MergeQueue should release reference to the current item

View File

@ -197,6 +197,13 @@ public class JHAdminConfig {
public static final String MR_HS_FS_STATE_STORE_URI =
MR_HISTORY_PREFIX + "recovery.store.fs.uri";
/**
* The local path where server state will be stored when
* HistoryServerLeveldbStateStoreService is configured as the state store
*/
public static final String MR_HS_LEVELDB_STATE_STORE_PATH =
MR_HISTORY_PREFIX + "recovery.store.leveldb.path";
/** Whether to use fixed ports with the minicluster. */
public static final String MR_HISTORY_MINICLUSTER_FIXED_PORTS = MR_HISTORY_PREFIX
+ "minicluster.fixed.ports";

View File

@ -1568,6 +1568,14 @@
storage class.</description>
</property>
<property>
<name>mapreduce.jobhistory.recovery.store.leveldb.path</name>
<value>${hadoop.tmp.dir}/mapred/history/recoverystore</value>
<description>The URI where history server state will be stored if
HistoryServerLeveldbSystemStateStoreService is configured as the recovery
storage class.</description>
</property>
<property>
<name>mapreduce.jobhistory.http.policy</name>
<value>HTTP_ONLY</value>

View File

@ -63,6 +63,10 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.fusesource.leveldbjni</groupId>
<artifactId>leveldbjni-all</artifactId>
</dependency>
</dependencies>
<build>

View File

@ -0,0 +1,379 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.hs;
import static org.fusesource.leveldbjni.JniDBFactory.asString;
import static org.fusesource.leveldbjni.JniDBFactory.bytes;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.util.Map.Entry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.mapreduce.v2.api.MRDelegationTokenIdentifier;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
import org.fusesource.leveldbjni.JniDBFactory;
import org.fusesource.leveldbjni.internal.NativeDB;
import org.iq80.leveldb.DB;
import org.iq80.leveldb.DBException;
import org.iq80.leveldb.Logger;
import org.iq80.leveldb.Options;
public class HistoryServerLeveldbStateStoreService extends
HistoryServerStateStoreService {
private static final String DB_NAME = "mr-jhs-state";
private static final String DB_SCHEMA_VERSION_KEY = "jhs-schema-version";
private static final String TOKEN_MASTER_KEY_KEY_PREFIX = "tokens/key_";
private static final String TOKEN_STATE_KEY_PREFIX = "tokens/token_";
private static final Version CURRENT_VERSION_INFO =
Version.newInstance(1, 0);
private DB db;
public static final Log LOG =
LogFactory.getLog(HistoryServerLeveldbStateStoreService.class);
@Override
protected void initStorage(Configuration conf) throws IOException {
}
@Override
protected void startStorage() throws IOException {
Path storeRoot = createStorageDir(getConfig());
Options options = new Options();
options.createIfMissing(false);
options.logger(new LeveldbLogger());
LOG.info("Using state database at " + storeRoot + " for recovery");
File dbfile = new File(storeRoot.toString());
try {
db = JniDBFactory.factory.open(dbfile, options);
} catch (NativeDB.DBException e) {
if (e.isNotFound() || e.getMessage().contains(" does not exist ")) {
LOG.info("Creating state database at " + dbfile);
options.createIfMissing(true);
try {
db = JniDBFactory.factory.open(dbfile, options);
// store version
storeVersion();
} catch (DBException dbErr) {
throw new IOException(dbErr.getMessage(), dbErr);
}
} else {
throw e;
}
}
checkVersion();
}
@Override
protected void closeStorage() throws IOException {
if (db != null) {
db.close();
db = null;
}
}
@Override
public HistoryServerState loadState() throws IOException {
HistoryServerState state = new HistoryServerState();
int numKeys = loadTokenMasterKeys(state);
LOG.info("Recovered " + numKeys + " token master keys");
int numTokens = loadTokens(state);
LOG.info("Recovered " + numTokens + " tokens");
return state;
}
private int loadTokenMasterKeys(HistoryServerState state)
throws IOException {
int numKeys = 0;
LeveldbIterator iter = null;
try {
iter = new LeveldbIterator(db);
iter.seek(bytes(TOKEN_MASTER_KEY_KEY_PREFIX));
while (iter.hasNext()) {
Entry<byte[],byte[]> entry = iter.next();
String key = asString(entry.getKey());
if (!key.startsWith(TOKEN_MASTER_KEY_KEY_PREFIX)) {
break;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Loading master key from " + key);
}
try {
loadTokenMasterKey(state, entry.getValue());
} catch (IOException e) {
throw new IOException("Error loading token master key from " + key,
e);
}
++numKeys;
}
} catch (DBException e) {
throw new IOException(e);
} finally {
if (iter != null) {
iter.close();
}
}
return numKeys;
}
private void loadTokenMasterKey(HistoryServerState state, byte[] data)
throws IOException {
DelegationKey key = new DelegationKey();
DataInputStream in =
new DataInputStream(new ByteArrayInputStream(data));
try {
key.readFields(in);
} finally {
IOUtils.cleanup(LOG, in);
}
state.tokenMasterKeyState.add(key);
}
private int loadTokens(HistoryServerState state) throws IOException {
int numTokens = 0;
LeveldbIterator iter = null;
try {
iter = new LeveldbIterator(db);
iter.seek(bytes(TOKEN_STATE_KEY_PREFIX));
while (iter.hasNext()) {
Entry<byte[],byte[]> entry = iter.next();
String key = asString(entry.getKey());
if (!key.startsWith(TOKEN_STATE_KEY_PREFIX)) {
break;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Loading token from " + key);
}
try {
loadToken(state, entry.getValue());
} catch (IOException e) {
throw new IOException("Error loading token state from " + key, e);
}
++numTokens;
}
} catch (DBException e) {
throw new IOException(e);
} finally {
if (iter != null) {
iter.close();
}
}
return numTokens;
}
private void loadToken(HistoryServerState state, byte[] data)
throws IOException {
MRDelegationTokenIdentifier tokenId = new MRDelegationTokenIdentifier();
long renewDate;
DataInputStream in = new DataInputStream(new ByteArrayInputStream(data));
try {
tokenId.readFields(in);
renewDate = in.readLong();
} finally {
IOUtils.cleanup(LOG, in);
}
state.tokenState.put(tokenId, renewDate);
}
@Override
public void storeToken(MRDelegationTokenIdentifier tokenId, Long renewDate)
throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Storing token " + tokenId.getSequenceNumber());
}
ByteArrayOutputStream memStream = new ByteArrayOutputStream();
DataOutputStream dataStream = new DataOutputStream(memStream);
try {
tokenId.write(dataStream);
dataStream.writeLong(renewDate);
dataStream.close();
dataStream = null;
} finally {
IOUtils.cleanup(LOG, dataStream);
}
String dbKey = getTokenDatabaseKey(tokenId);
try {
db.put(bytes(dbKey), memStream.toByteArray());
} catch (DBException e) {
throw new IOException(e);
}
}
@Override
public void updateToken(MRDelegationTokenIdentifier tokenId, Long renewDate)
throws IOException {
storeToken(tokenId, renewDate);
}
@Override
public void removeToken(MRDelegationTokenIdentifier tokenId)
throws IOException {
String dbKey = getTokenDatabaseKey(tokenId);
try {
db.delete(bytes(dbKey));
} catch (DBException e) {
throw new IOException(e);
}
}
private String getTokenDatabaseKey(MRDelegationTokenIdentifier tokenId) {
return TOKEN_STATE_KEY_PREFIX + tokenId.getSequenceNumber();
}
@Override
public void storeTokenMasterKey(DelegationKey masterKey)
throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Storing master key " + masterKey.getKeyId());
}
ByteArrayOutputStream memStream = new ByteArrayOutputStream();
DataOutputStream dataStream = new DataOutputStream(memStream);
try {
masterKey.write(dataStream);
dataStream.close();
dataStream = null;
} finally {
IOUtils.cleanup(LOG, dataStream);
}
String dbKey = getTokenMasterKeyDatabaseKey(masterKey);
try {
db.put(bytes(dbKey), memStream.toByteArray());
} catch (DBException e) {
throw new IOException(e);
}
}
@Override
public void removeTokenMasterKey(DelegationKey masterKey)
throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Removing master key " + masterKey.getKeyId());
}
String dbKey = getTokenMasterKeyDatabaseKey(masterKey);
try {
db.delete(bytes(dbKey));
} catch (DBException e) {
throw new IOException(e);
}
}
private String getTokenMasterKeyDatabaseKey(DelegationKey masterKey) {
return TOKEN_MASTER_KEY_KEY_PREFIX + masterKey.getKeyId();
}
private Path createStorageDir(Configuration conf) throws IOException {
String confPath = conf.get(JHAdminConfig.MR_HS_LEVELDB_STATE_STORE_PATH);
if (confPath == null) {
throw new IOException("No store location directory configured in " +
JHAdminConfig.MR_HS_LEVELDB_STATE_STORE_PATH);
}
Path root = new Path(confPath, DB_NAME);
FileSystem fs = FileSystem.getLocal(conf);
fs.mkdirs(root, new FsPermission((short)0700));
return root;
}
Version loadVersion() throws IOException {
byte[] data = db.get(bytes(DB_SCHEMA_VERSION_KEY));
// if version is not stored previously, treat it as 1.0.
if (data == null || data.length == 0) {
return Version.newInstance(1, 0);
}
Version version =
new VersionPBImpl(VersionProto.parseFrom(data));
return version;
}
private void storeVersion() throws IOException {
dbStoreVersion(CURRENT_VERSION_INFO);
}
void dbStoreVersion(Version state) throws IOException {
String key = DB_SCHEMA_VERSION_KEY;
byte[] data =
((VersionPBImpl) state).getProto().toByteArray();
try {
db.put(bytes(key), data);
} catch (DBException e) {
throw new IOException(e);
}
}
Version getCurrentVersion() {
return CURRENT_VERSION_INFO;
}
/**
* 1) Versioning scheme: major.minor. For e.g. 1.0, 1.1, 1.2...1.25, 2.0 etc.
* 2) Any incompatible change of state-store is a major upgrade, and any
* compatible change of state-store is a minor upgrade.
* 3) Within a minor upgrade, say 1.1 to 1.2:
* overwrite the version info and proceed as normal.
* 4) Within a major upgrade, say 1.2 to 2.0:
* throw exception and indicate user to use a separate upgrade tool to
* upgrade state or remove incompatible old state.
*/
private void checkVersion() throws IOException {
Version loadedVersion = loadVersion();
LOG.info("Loaded state version info " + loadedVersion);
if (loadedVersion.equals(getCurrentVersion())) {
return;
}
if (loadedVersion.isCompatibleTo(getCurrentVersion())) {
LOG.info("Storing state version info " + getCurrentVersion());
storeVersion();
} else {
throw new IOException(
"Incompatible version for state: expecting state version "
+ getCurrentVersion() + ", but loading version " + loadedVersion);
}
}
private static class LeveldbLogger implements Logger {
private static final Log LOG = LogFactory.getLog(LeveldbLogger.class);
@Override
public void log(String message) {
LOG.info(message);
}
}
}

View File

@ -0,0 +1,207 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.hs;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.File;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.v2.api.MRDelegationTokenIdentifier;
import org.apache.hadoop.mapreduce.v2.hs.HistoryServerStateStoreService.HistoryServerState;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.service.ServiceStateException;
import org.apache.hadoop.yarn.server.records.Version;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class TestHistoryServerLeveldbStateStoreService {
private static final File testDir = new File(
System.getProperty("test.build.data",
System.getProperty("java.io.tmpdir")),
"TestHistoryServerLeveldbSystemStateStoreService");
private Configuration conf;
@Before
public void setup() {
FileUtil.fullyDelete(testDir);
testDir.mkdirs();
conf = new Configuration();
conf.setBoolean(JHAdminConfig.MR_HS_RECOVERY_ENABLE, true);
conf.setClass(JHAdminConfig.MR_HS_STATE_STORE,
HistoryServerLeveldbStateStoreService.class,
HistoryServerStateStoreService.class);
conf.set(JHAdminConfig.MR_HS_LEVELDB_STATE_STORE_PATH,
testDir.getAbsoluteFile().toString());
}
@After
public void cleanup() {
FileUtil.fullyDelete(testDir);
}
private HistoryServerStateStoreService createAndStartStore()
throws IOException {
HistoryServerStateStoreService store =
HistoryServerStateStoreServiceFactory.getStore(conf);
assertTrue("Factory did not create a leveldb store",
store instanceof HistoryServerLeveldbStateStoreService);
store.init(conf);
store.start();
return store;
}
@Test
public void testCheckVersion() throws IOException {
HistoryServerLeveldbStateStoreService store =
new HistoryServerLeveldbStateStoreService();
store.init(conf);
store.start();
// default version
Version defaultVersion = store.getCurrentVersion();
assertEquals(defaultVersion, store.loadVersion());
// compatible version
Version compatibleVersion =
Version.newInstance(defaultVersion.getMajorVersion(),
defaultVersion.getMinorVersion() + 2);
store.dbStoreVersion(compatibleVersion);
assertEquals(compatibleVersion, store.loadVersion());
store.close();
store = new HistoryServerLeveldbStateStoreService();
store.init(conf);
store.start();
// overwrite the compatible version
assertEquals(defaultVersion, store.loadVersion());
// incompatible version
Version incompatibleVersion =
Version.newInstance(defaultVersion.getMajorVersion() + 1,
defaultVersion.getMinorVersion());
store.dbStoreVersion(incompatibleVersion);
store.close();
store = new HistoryServerLeveldbStateStoreService();
try {
store.init(conf);
store.start();
fail("Incompatible version, should have thrown before here.");
} catch (ServiceStateException e) {
assertTrue("Exception message mismatch",
e.getMessage().contains("Incompatible version for state:"));
}
store.close();
}
@Test
public void testTokenStore() throws IOException {
HistoryServerStateStoreService store = createAndStartStore();
// verify initially the store is empty
HistoryServerState state = store.loadState();
assertTrue("token state not empty", state.tokenState.isEmpty());
assertTrue("key state not empty", state.tokenMasterKeyState.isEmpty());
// store a key and some tokens
final DelegationKey key1 = new DelegationKey(1, 2, "keyData1".getBytes());
final MRDelegationTokenIdentifier token1 =
new MRDelegationTokenIdentifier(new Text("tokenOwner1"),
new Text("tokenRenewer1"), new Text("tokenUser1"));
token1.setSequenceNumber(1);
final Long tokenDate1 = 1L;
final MRDelegationTokenIdentifier token2 =
new MRDelegationTokenIdentifier(new Text("tokenOwner2"),
new Text("tokenRenewer2"), new Text("tokenUser2"));
token2.setSequenceNumber(12345678);
final Long tokenDate2 = 87654321L;
store.storeTokenMasterKey(key1);
store.storeToken(token1, tokenDate1);
store.storeToken(token2, tokenDate2);
store.close();
// verify the key and tokens can be recovered
store = createAndStartStore();
state = store.loadState();
assertEquals("incorrect loaded token count", 2, state.tokenState.size());
assertTrue("missing token 1", state.tokenState.containsKey(token1));
assertEquals("incorrect token 1 date", tokenDate1,
state.tokenState.get(token1));
assertTrue("missing token 2", state.tokenState.containsKey(token2));
assertEquals("incorrect token 2 date", tokenDate2,
state.tokenState.get(token2));
assertEquals("incorrect master key count", 1,
state.tokenMasterKeyState.size());
assertTrue("missing master key 1",
state.tokenMasterKeyState.contains(key1));
// store some more keys and tokens, remove the previous key and one
// of the tokens, and renew a previous token
final DelegationKey key2 = new DelegationKey(3, 4, "keyData2".getBytes());
final DelegationKey key3 = new DelegationKey(5, 6, "keyData3".getBytes());
final MRDelegationTokenIdentifier token3 =
new MRDelegationTokenIdentifier(new Text("tokenOwner3"),
new Text("tokenRenewer3"), new Text("tokenUser3"));
token3.setSequenceNumber(12345679);
final Long tokenDate3 = 87654321L;
store.removeToken(token1);
store.storeTokenMasterKey(key2);
final Long newTokenDate2 = 975318642L;
store.updateToken(token2, newTokenDate2);
store.removeTokenMasterKey(key1);
store.storeTokenMasterKey(key3);
store.storeToken(token3, tokenDate3);
store.close();
// verify the new keys and tokens are recovered, the removed key and
// token are no longer present, and the renewed token has the updated
// expiration date
store = createAndStartStore();
state = store.loadState();
assertEquals("incorrect loaded token count", 2, state.tokenState.size());
assertFalse("token 1 not removed", state.tokenState.containsKey(token1));
assertTrue("missing token 2", state.tokenState.containsKey(token2));
assertEquals("incorrect token 2 date", newTokenDate2,
state.tokenState.get(token2));
assertTrue("missing token 3", state.tokenState.containsKey(token3));
assertEquals("incorrect token 3 date", tokenDate3,
state.tokenState.get(token3));
assertEquals("incorrect master key count", 2,
state.tokenMasterKeyState.size());
assertFalse("master key 1 not removed",
state.tokenMasterKeyState.contains(key1));
assertTrue("missing master key 2",
state.tokenMasterKeyState.contains(key2));
assertTrue("missing master key 3",
state.tokenMasterKeyState.contains(key3));
store.close();
}
}