MAPREDUCE-5963. ShuffleHandler DB schema should be versioned with compatible/incompatible changes. Contributed by Junping Du

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1612652 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jason Darrell Lowe 2014-07-22 19:22:24 +00:00
parent df2abcb372
commit 60b1e835e0
3 changed files with 163 additions and 14 deletions

View File

@ -159,6 +159,9 @@ Release 2.6.0 - UNRELEASED
MAPREDUCE-5971. Move the default options for distcp -p to
DistCpOptionSwitch. (clamb via wang)
MAPREDUCE-5963. ShuffleHandler DB schema should be versioned with
compatible/incompatible changes (Junping Du via jlowe)
OPTIMIZATIONS
BUG FIXES

View File

@ -82,10 +82,13 @@
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.NMDBSchemaVersionProto;
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
import org.apache.hadoop.yarn.server.api.AuxiliaryService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.hadoop.yarn.server.nodemanager.recovery.records.NMDBSchemaVersion;
import org.apache.hadoop.yarn.server.nodemanager.recovery.records.impl.pb.NMDBSchemaVersionPBImpl;
import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.fusesource.leveldbjni.JniDBFactory;
@ -125,6 +128,7 @@
import org.jboss.netty.util.CharsetUtil;
import org.mortbay.jetty.HttpHeaders;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.protobuf.ByteString;
@ -146,8 +150,9 @@ public class ShuffleHandler extends AuxiliaryService {
Pattern.CASE_INSENSITIVE);
private static final String STATE_DB_NAME = "mapreduce_shuffle_state";
private static final String STATE_DB_SCHEMA_VERSION_KEY = "schema-version";
private static final String STATE_DB_SCHEMA_VERSION = "1.0";
private static final String STATE_DB_SCHEMA_VERSION_KEY = "shuffle-schema-version";
protected static final NMDBSchemaVersion CURRENT_VERSION_INFO =
NMDBSchemaVersion.newInstance(1, 0);
private int port;
private ChannelFactory selector;
@ -466,18 +471,15 @@ private void startStore(Path recoveryRoot) throws IOException {
Path dbPath = new Path(recoveryRoot, STATE_DB_NAME);
LOG.info("Using state database at " + dbPath + " for recovery");
File dbfile = new File(dbPath.toString());
byte[] schemaVersionData;
try {
stateDb = JniDBFactory.factory.open(dbfile, options);
schemaVersionData = stateDb.get(bytes(STATE_DB_SCHEMA_VERSION_KEY));
} catch (NativeDB.DBException e) {
if (e.isNotFound() || e.getMessage().contains(" does not exist ")) {
LOG.info("Creating state database at " + dbfile);
options.createIfMissing(true);
try {
stateDb = JniDBFactory.factory.open(dbfile, options);
schemaVersionData = bytes(STATE_DB_SCHEMA_VERSION);
stateDb.put(bytes(STATE_DB_SCHEMA_VERSION_KEY), schemaVersionData);
storeVersion();
} catch (DBException dbExc) {
throw new IOException("Unable to create state store", dbExc);
}
@ -485,15 +487,69 @@ private void startStore(Path recoveryRoot) throws IOException {
throw e;
}
}
if (schemaVersionData != null) {
String schemaVersion = asString(schemaVersionData);
// only support exact schema matches for now
if (!STATE_DB_SCHEMA_VERSION.equals(schemaVersion)) {
throw new IOException("Incompatible state database schema, found "
+ schemaVersion + " expected " + STATE_DB_SCHEMA_VERSION);
checkVersion();
}
@VisibleForTesting
NMDBSchemaVersion loadVersion() throws IOException {
byte[] data = stateDb.get(bytes(STATE_DB_SCHEMA_VERSION_KEY));
// if version is not stored previously, treat it as 1.0.
if (data == null || data.length == 0) {
return NMDBSchemaVersion.newInstance(1, 0);
}
NMDBSchemaVersion version =
new NMDBSchemaVersionPBImpl(NMDBSchemaVersionProto.parseFrom(data));
return version;
}
private void storeSchemaVersion(NMDBSchemaVersion version) throws IOException {
String key = STATE_DB_SCHEMA_VERSION_KEY;
byte[] data =
((NMDBSchemaVersionPBImpl) version).getProto().toByteArray();
try {
stateDb.put(bytes(key), data);
} catch (DBException e) {
throw new IOException(e.getMessage(), e);
}
}
private void storeVersion() throws IOException {
storeSchemaVersion(CURRENT_VERSION_INFO);
}
// Only used for test
@VisibleForTesting
void storeVersion(NMDBSchemaVersion version) throws IOException {
storeSchemaVersion(version);
}
protected NMDBSchemaVersion getCurrentVersion() {
return CURRENT_VERSION_INFO;
}
/**
* 1) Versioning scheme: major.minor. For e.g. 1.0, 1.1, 1.2...1.25, 2.0 etc.
* 2) Any incompatible change of DB schema is a major upgrade, and any
* compatible change of DB schema is a minor upgrade.
* 3) Within a minor upgrade, say 1.1 to 1.2:
* overwrite the version info and proceed as normal.
* 4) Within a major upgrade, say 1.2 to 2.0:
* throw exception and indicate user to use a separate upgrade tool to
* upgrade shuffle info or remove incompatible old state.
*/
private void checkVersion() throws IOException {
NMDBSchemaVersion loadedVersion = loadVersion();
LOG.info("Loaded state DB schema version info " + loadedVersion);
if (loadedVersion.equals(getCurrentVersion())) {
return;
}
if (loadedVersion.isCompatibleTo(getCurrentVersion())) {
LOG.info("Storing state DB schedma version info " + getCurrentVersion());
storeVersion();
} else {
throw new IOException("State database schema version not found");
throw new IOException(
"Incompatible version for state DB schema: expecting DB schema version "
+ getCurrentVersion() + ", but loading version " + loadedVersion);
}
}

View File

@ -67,6 +67,7 @@
import org.apache.hadoop.metrics2.impl.MetricsSystemImpl;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.service.ServiceStateException;
import org.apache.hadoop.util.PureJavaCrc32;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -74,6 +75,7 @@
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.hadoop.yarn.server.nodemanager.recovery.records.NMDBSchemaVersion;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelHandlerContext;
@ -719,6 +721,94 @@ public void testRecovery() throws IOException {
}
}
@Test
public void testRecoveryFromOtherVersions() throws IOException {
final String user = "someuser";
final ApplicationId appId = ApplicationId.newInstance(12345, 1);
final File tmpDir = new File(System.getProperty("test.build.data",
System.getProperty("java.io.tmpdir")),
TestShuffleHandler.class.getName());
Configuration conf = new Configuration();
conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3);
ShuffleHandler shuffle = new ShuffleHandler();
// emulate aux services startup with recovery enabled
shuffle.setRecoveryPath(new Path(tmpDir.toString()));
tmpDir.mkdirs();
try {
shuffle.init(conf);
shuffle.start();
// setup a shuffle token for an application
DataOutputBuffer outputBuffer = new DataOutputBuffer();
outputBuffer.reset();
Token<JobTokenIdentifier> jt = new Token<JobTokenIdentifier>(
"identifier".getBytes(), "password".getBytes(), new Text(user),
new Text("shuffleService"));
jt.write(outputBuffer);
shuffle.initializeApplication(new ApplicationInitializationContext(user,
appId, ByteBuffer.wrap(outputBuffer.getData(), 0,
outputBuffer.getLength())));
// verify we are authorized to shuffle
int rc = getShuffleResponseCode(shuffle, jt);
Assert.assertEquals(HttpURLConnection.HTTP_OK, rc);
// emulate shuffle handler restart
shuffle.close();
shuffle = new ShuffleHandler();
shuffle.setRecoveryPath(new Path(tmpDir.toString()));
shuffle.init(conf);
shuffle.start();
// verify we are still authorized to shuffle to the old application
rc = getShuffleResponseCode(shuffle, jt);
Assert.assertEquals(HttpURLConnection.HTTP_OK, rc);
NMDBSchemaVersion version = NMDBSchemaVersion.newInstance(1, 0);
Assert.assertEquals(version, shuffle.getCurrentVersion());
// emulate shuffle handler restart with compatible version
NMDBSchemaVersion version11 = NMDBSchemaVersion.newInstance(1, 1);
// update version info before close shuffle
shuffle.storeVersion(version11);
Assert.assertEquals(version11, shuffle.loadVersion());
shuffle.close();
shuffle = new ShuffleHandler();
shuffle.setRecoveryPath(new Path(tmpDir.toString()));
shuffle.init(conf);
shuffle.start();
// shuffle version will be override by CURRENT_VERSION_INFO after restart
// successfully.
Assert.assertEquals(version, shuffle.loadVersion());
// verify we are still authorized to shuffle to the old application
rc = getShuffleResponseCode(shuffle, jt);
Assert.assertEquals(HttpURLConnection.HTTP_OK, rc);
// emulate shuffle handler restart with incompatible version
NMDBSchemaVersion version21 = NMDBSchemaVersion.newInstance(2, 1);
shuffle.storeVersion(version21);
Assert.assertEquals(version21, shuffle.loadVersion());
shuffle.close();
shuffle = new ShuffleHandler();
shuffle.setRecoveryPath(new Path(tmpDir.toString()));
shuffle.init(conf);
try {
shuffle.start();
Assert.fail("Incompatible version, should expect fail here.");
} catch (ServiceStateException e) {
Assert.assertTrue("Exception message mismatch",
e.getMessage().contains("Incompatible version for state DB schema:"));
}
} finally {
if (shuffle != null) {
shuffle.close();
}
FileUtil.fullyDelete(tmpDir);
}
}
private static int getShuffleResponseCode(ShuffleHandler shuffle,
Token<JobTokenIdentifier> jt) throws IOException {
URL url = new URL("http://127.0.0.1:"