YARN-2347. Consolidated RMStateVersion and NMDBSchemaVersion into Version in yarn-server-common. Contributed by Junping Du.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1614838 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhijie Shen 2014-07-31 09:27:43 +00:00
parent b8b8f3f5e7
commit 1d6e178144
20 changed files with 126 additions and 283 deletions

View File

@ -82,13 +82,13 @@
import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.NMDBSchemaVersionProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext; import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext; import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
import org.apache.hadoop.yarn.server.api.AuxiliaryService; import org.apache.hadoop.yarn.server.api.AuxiliaryService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.hadoop.yarn.server.nodemanager.recovery.records.NMDBSchemaVersion; import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.nodemanager.recovery.records.impl.pb.NMDBSchemaVersionPBImpl; import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
import org.apache.hadoop.yarn.server.utils.LeveldbIterator; import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
import org.fusesource.leveldbjni.JniDBFactory; import org.fusesource.leveldbjni.JniDBFactory;
@ -151,8 +151,8 @@ public class ShuffleHandler extends AuxiliaryService {
private static final String STATE_DB_NAME = "mapreduce_shuffle_state"; private static final String STATE_DB_NAME = "mapreduce_shuffle_state";
private static final String STATE_DB_SCHEMA_VERSION_KEY = "shuffle-schema-version"; private static final String STATE_DB_SCHEMA_VERSION_KEY = "shuffle-schema-version";
protected static final NMDBSchemaVersion CURRENT_VERSION_INFO = protected static final Version CURRENT_VERSION_INFO =
NMDBSchemaVersion.newInstance(1, 0); Version.newInstance(1, 0);
private int port; private int port;
private ChannelFactory selector; private ChannelFactory selector;
@ -491,21 +491,21 @@ private void startStore(Path recoveryRoot) throws IOException {
} }
@VisibleForTesting @VisibleForTesting
NMDBSchemaVersion loadVersion() throws IOException { Version loadVersion() throws IOException {
byte[] data = stateDb.get(bytes(STATE_DB_SCHEMA_VERSION_KEY)); byte[] data = stateDb.get(bytes(STATE_DB_SCHEMA_VERSION_KEY));
// if version is not stored previously, treat it as 1.0. // if version is not stored previously, treat it as 1.0.
if (data == null || data.length == 0) { if (data == null || data.length == 0) {
return NMDBSchemaVersion.newInstance(1, 0); return Version.newInstance(1, 0);
} }
NMDBSchemaVersion version = Version version =
new NMDBSchemaVersionPBImpl(NMDBSchemaVersionProto.parseFrom(data)); new VersionPBImpl(VersionProto.parseFrom(data));
return version; return version;
} }
private void storeSchemaVersion(NMDBSchemaVersion version) throws IOException { private void storeSchemaVersion(Version version) throws IOException {
String key = STATE_DB_SCHEMA_VERSION_KEY; String key = STATE_DB_SCHEMA_VERSION_KEY;
byte[] data = byte[] data =
((NMDBSchemaVersionPBImpl) version).getProto().toByteArray(); ((VersionPBImpl) version).getProto().toByteArray();
try { try {
stateDb.put(bytes(key), data); stateDb.put(bytes(key), data);
} catch (DBException e) { } catch (DBException e) {
@ -519,11 +519,11 @@ private void storeVersion() throws IOException {
// Only used for test // Only used for test
@VisibleForTesting @VisibleForTesting
void storeVersion(NMDBSchemaVersion version) throws IOException { void storeVersion(Version version) throws IOException {
storeSchemaVersion(version); storeSchemaVersion(version);
} }
protected NMDBSchemaVersion getCurrentVersion() { protected Version getCurrentVersion() {
return CURRENT_VERSION_INFO; return CURRENT_VERSION_INFO;
} }
@ -538,7 +538,7 @@ protected NMDBSchemaVersion getCurrentVersion() {
* upgrade shuffle info or remove incompatible old state. * upgrade shuffle info or remove incompatible old state.
*/ */
private void checkVersion() throws IOException { private void checkVersion() throws IOException {
NMDBSchemaVersion loadedVersion = loadVersion(); Version loadedVersion = loadVersion();
LOG.info("Loaded state DB schema version info " + loadedVersion); LOG.info("Loaded state DB schema version info " + loadedVersion);
if (loadedVersion.equals(getCurrentVersion())) { if (loadedVersion.equals(getCurrentVersion())) {
return; return;

View File

@ -75,7 +75,7 @@
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext; import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext; import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.hadoop.yarn.server.nodemanager.recovery.records.NMDBSchemaVersion; import org.apache.hadoop.yarn.server.records.Version;
import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelHandlerContext;
@ -764,11 +764,11 @@ public void testRecoveryFromOtherVersions() throws IOException {
// verify we are still authorized to shuffle to the old application // verify we are still authorized to shuffle to the old application
rc = getShuffleResponseCode(shuffle, jt); rc = getShuffleResponseCode(shuffle, jt);
Assert.assertEquals(HttpURLConnection.HTTP_OK, rc); Assert.assertEquals(HttpURLConnection.HTTP_OK, rc);
NMDBSchemaVersion version = NMDBSchemaVersion.newInstance(1, 0); Version version = Version.newInstance(1, 0);
Assert.assertEquals(version, shuffle.getCurrentVersion()); Assert.assertEquals(version, shuffle.getCurrentVersion());
// emulate shuffle handler restart with compatible version // emulate shuffle handler restart with compatible version
NMDBSchemaVersion version11 = NMDBSchemaVersion.newInstance(1, 1); Version version11 = Version.newInstance(1, 1);
// update version info before close shuffle // update version info before close shuffle
shuffle.storeVersion(version11); shuffle.storeVersion(version11);
Assert.assertEquals(version11, shuffle.loadVersion()); Assert.assertEquals(version11, shuffle.loadVersion());
@ -785,7 +785,7 @@ public void testRecoveryFromOtherVersions() throws IOException {
Assert.assertEquals(HttpURLConnection.HTTP_OK, rc); Assert.assertEquals(HttpURLConnection.HTTP_OK, rc);
// emulate shuffle handler restart with incompatible version // emulate shuffle handler restart with incompatible version
NMDBSchemaVersion version21 = NMDBSchemaVersion.newInstance(2, 1); Version version21 = Version.newInstance(2, 1);
shuffle.storeVersion(version21); shuffle.storeVersion(version21);
Assert.assertEquals(version21, shuffle.loadVersion()); Assert.assertEquals(version21, shuffle.loadVersion());
shuffle.close(); shuffle.close();

View File

@ -74,6 +74,9 @@ Release 2.6.0 - UNRELEASED
YARN-2328. FairScheduler: Verify update and continuous scheduling threads are YARN-2328. FairScheduler: Verify update and continuous scheduling threads are
stopped when the scheduler is stopped. (kasha) stopped when the scheduler is stopped. (kasha)
YARN-2347. Consolidated RMStateVersion and NMDBSchemaVersion into Version in
yarn-server-common. (Junping Du via zjshen)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -130,11 +130,6 @@ message ApplicationAttemptStateDataProto {
optional int32 am_container_exit_status = 9 [default = -1000]; optional int32 am_container_exit_status = 9 [default = -1000];
} }
message RMStateVersionProto {
optional int32 major_version = 1;
optional int32 minor_version = 2;
}
message EpochProto { message EpochProto {
optional int64 epoch = 1; optional int64 epoch = 1;
} }

View File

@ -15,21 +15,26 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.yarn.server.nodemanager.recovery.records;
import org.apache.hadoop.classification.InterfaceAudience.Private; package org.apache.hadoop.yarn.server.records;
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
/** /**
* The version information of DB Schema for NM. * The version information for state get stored in YARN components,
* i.e. RMState, NMState, etc., which include: majorVersion and
* minorVersion.
* The major version update means incompatible changes happen while
* minor version update indicates compatible changes.
*/ */
@Private @LimitedPrivate({"YARN", "MapReduce"})
@Unstable @Unstable
public abstract class NMDBSchemaVersion { public abstract class Version {
public static NMDBSchemaVersion newInstance(int majorVersion, int minorVersion) { public static Version newInstance(int majorVersion, int minorVersion) {
NMDBSchemaVersion version = Records.newRecord(NMDBSchemaVersion.class); Version version = Records.newRecord(Version.class);
version.setMajorVersion(majorVersion); version.setMajorVersion(majorVersion);
version.setMinorVersion(minorVersion); version.setMinorVersion(minorVersion);
return version; return version;
@ -47,7 +52,7 @@ public String toString() {
return getMajorVersion() + "." + getMinorVersion(); return getMajorVersion() + "." + getMinorVersion();
} }
public boolean isCompatibleTo(NMDBSchemaVersion version) { public boolean isCompatibleTo(Version version) {
return getMajorVersion() == version.getMajorVersion(); return getMajorVersion() == version.getMajorVersion();
} }
@ -68,7 +73,7 @@ public boolean equals(Object obj) {
return false; return false;
if (getClass() != obj.getClass()) if (getClass() != obj.getClass())
return false; return false;
NMDBSchemaVersion other = (NMDBSchemaVersion) obj; Version other = (Version) obj;
if (this.getMajorVersion() == other.getMajorVersion() if (this.getMajorVersion() == other.getMajorVersion()
&& this.getMinorVersion() == other.getMinorVersion()) { && this.getMinorVersion() == other.getMinorVersion()) {
return true; return true;
@ -76,5 +81,4 @@ public boolean equals(Object obj) {
return false; return false;
} }
} }
} }

View File

@ -16,28 +16,29 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb; package org.apache.hadoop.yarn.server.records.impl.pb;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMStateVersionProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMStateVersionProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProtoOrBuilder;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
public class RMStateVersionPBImpl extends RMStateVersion { import org.apache.hadoop.yarn.server.records.Version;
RMStateVersionProto proto = RMStateVersionProto.getDefaultInstance(); public class VersionPBImpl extends Version {
RMStateVersionProto.Builder builder = null;
VersionProto proto = VersionProto.getDefaultInstance();
VersionProto.Builder builder = null;
boolean viaProto = false; boolean viaProto = false;
public RMStateVersionPBImpl() { public VersionPBImpl() {
builder = RMStateVersionProto.newBuilder(); builder = VersionProto.newBuilder();
} }
public RMStateVersionPBImpl(RMStateVersionProto proto) { public VersionPBImpl(VersionProto proto) {
this.proto = proto; this.proto = proto;
viaProto = true; viaProto = true;
} }
public RMStateVersionProto getProto() { public VersionProto getProto() {
proto = viaProto ? proto : builder.build(); proto = viaProto ? proto : builder.build();
viaProto = true; viaProto = true;
return proto; return proto;
@ -45,14 +46,14 @@ public RMStateVersionProto getProto() {
private void maybeInitBuilder() { private void maybeInitBuilder() {
if (viaProto || builder == null) { if (viaProto || builder == null) {
builder = RMStateVersionProto.newBuilder(proto); builder = VersionProto.newBuilder(proto);
} }
viaProto = false; viaProto = false;
} }
@Override @Override
public int getMajorVersion() { public int getMajorVersion() {
RMStateVersionProtoOrBuilder p = viaProto ? proto : builder; VersionProtoOrBuilder p = viaProto ? proto : builder;
return p.getMajorVersion(); return p.getMajorVersion();
} }
@ -64,7 +65,7 @@ public void setMajorVersion(int major) {
@Override @Override
public int getMinorVersion() { public int getMinorVersion() {
RMStateVersionProtoOrBuilder p = viaProto ? proto : builder; VersionProtoOrBuilder p = viaProto ? proto : builder;
return p.getMinorVersion(); return p.getMinorVersion();
} }

View File

@ -47,4 +47,10 @@ message NodeHealthStatusProto {
optional bool is_node_healthy = 1; optional bool is_node_healthy = 1;
optional string health_report = 2; optional string health_report = 2;
optional int64 last_health_report_time = 3; optional int64 last_health_report_time = 3;
} }
message VersionProto {
optional int32 major_version = 1;
optional int32 minor_version = 2;
}

View File

@ -41,13 +41,13 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.NMDBSchemaVersionProto;
import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
import org.apache.hadoop.yarn.server.nodemanager.recovery.records.NMDBSchemaVersion; import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.nodemanager.recovery.records.impl.pb.NMDBSchemaVersionPBImpl; import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
import org.apache.hadoop.yarn.server.utils.LeveldbIterator; import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
import org.fusesource.leveldbjni.JniDBFactory; import org.fusesource.leveldbjni.JniDBFactory;
@ -68,7 +68,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
private static final String DB_NAME = "yarn-nm-state"; private static final String DB_NAME = "yarn-nm-state";
private static final String DB_SCHEMA_VERSION_KEY = "nm-schema-version"; private static final String DB_SCHEMA_VERSION_KEY = "nm-schema-version";
private static final NMDBSchemaVersion CURRENT_VERSION_INFO = NMDBSchemaVersion private static final Version CURRENT_VERSION_INFO = Version
.newInstance(1, 0); .newInstance(1, 0);
private static final String DELETION_TASK_KEY_PREFIX = private static final String DELETION_TASK_KEY_PREFIX =
@ -617,14 +617,14 @@ public void log(String message) {
} }
NMDBSchemaVersion loadVersion() throws IOException { Version loadVersion() throws IOException {
byte[] data = db.get(bytes(DB_SCHEMA_VERSION_KEY)); byte[] data = db.get(bytes(DB_SCHEMA_VERSION_KEY));
// if version is not stored previously, treat it as 1.0. // if version is not stored previously, treat it as 1.0.
if (data == null || data.length == 0) { if (data == null || data.length == 0) {
return NMDBSchemaVersion.newInstance(1, 0); return Version.newInstance(1, 0);
} }
NMDBSchemaVersion version = Version version =
new NMDBSchemaVersionPBImpl(NMDBSchemaVersionProto.parseFrom(data)); new VersionPBImpl(VersionProto.parseFrom(data));
return version; return version;
} }
@ -634,14 +634,14 @@ private void storeVersion() throws IOException {
// Only used for test // Only used for test
@VisibleForTesting @VisibleForTesting
void storeVersion(NMDBSchemaVersion state) throws IOException { void storeVersion(Version state) throws IOException {
dbStoreVersion(state); dbStoreVersion(state);
} }
private void dbStoreVersion(NMDBSchemaVersion state) throws IOException { private void dbStoreVersion(Version state) throws IOException {
String key = DB_SCHEMA_VERSION_KEY; String key = DB_SCHEMA_VERSION_KEY;
byte[] data = byte[] data =
((NMDBSchemaVersionPBImpl) state).getProto().toByteArray(); ((VersionPBImpl) state).getProto().toByteArray();
try { try {
db.put(bytes(key), data); db.put(bytes(key), data);
} catch (DBException e) { } catch (DBException e) {
@ -649,7 +649,7 @@ private void dbStoreVersion(NMDBSchemaVersion state) throws IOException {
} }
} }
NMDBSchemaVersion getCurrentVersion() { Version getCurrentVersion() {
return CURRENT_VERSION_INFO; return CURRENT_VERSION_INFO;
} }
@ -664,9 +664,9 @@ NMDBSchemaVersion getCurrentVersion() {
* upgrade NM state or remove incompatible old state. * upgrade NM state or remove incompatible old state.
*/ */
private void checkVersion() throws IOException { private void checkVersion() throws IOException {
NMDBSchemaVersion loadedVersion = loadVersion(); Version loadedVersion = loadVersion();
LOG.info("Loaded NM state version info " + loadedVersion); LOG.info("Loaded NM state version info " + loadedVersion);
if (loadedVersion != null && loadedVersion.equals(getCurrentVersion())) { if (loadedVersion.equals(getCurrentVersion())) {
return; return;
} }
if (loadedVersion.isCompatibleTo(getCurrentVersion())) { if (loadedVersion.isCompatibleTo(getCurrentVersion())) {

View File

@ -1,81 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.recovery.records.impl.pb;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.NMDBSchemaVersionProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.NMDBSchemaVersionProtoOrBuilder;
import org.apache.hadoop.yarn.server.nodemanager.recovery.records.NMDBSchemaVersion;
@Private
@Evolving
public class NMDBSchemaVersionPBImpl extends NMDBSchemaVersion {
NMDBSchemaVersionProto proto = NMDBSchemaVersionProto.getDefaultInstance();
NMDBSchemaVersionProto.Builder builder = null;
boolean viaProto = false;
public NMDBSchemaVersionPBImpl() {
builder = NMDBSchemaVersionProto.newBuilder();
}
public NMDBSchemaVersionPBImpl(NMDBSchemaVersionProto proto) {
this.proto = proto;
viaProto = true;
}
public NMDBSchemaVersionProto getProto() {
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = NMDBSchemaVersionProto.newBuilder(proto);
}
viaProto = false;
}
@Override
public int getMajorVersion() {
NMDBSchemaVersionProtoOrBuilder p = viaProto ? proto : builder;
return p.getMajorVersion();
}
@Override
public void setMajorVersion(int majorVersion) {
maybeInitBuilder();
builder.setMajorVersion(majorVersion);
}
@Override
public int getMinorVersion() {
NMDBSchemaVersionProtoOrBuilder p = viaProto ? proto : builder;
return p.getMinorVersion();
}
@Override
public void setMinorVersion(int minorVersion) {
maybeInitBuilder();
builder.setMinorVersion(minorVersion);
}
}

View File

@ -39,8 +39,3 @@ message LocalizedResourceProto {
optional int64 size = 3; optional int64 size = 3;
} }
message NMDBSchemaVersionProto {
optional int32 majorVersion = 1;
optional int32 minorVersion = 2;
}

View File

@ -49,7 +49,7 @@
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLocalizationState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLocalizationState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredNMTokensState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredNMTokensState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredUserResources; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredUserResources;
import org.apache.hadoop.yarn.server.nodemanager.recovery.records.NMDBSchemaVersion; import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager; import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.security.BaseNMTokenSecretManager; import org.apache.hadoop.yarn.server.security.BaseNMTokenSecretManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@ -114,12 +114,12 @@ public void testEmptyState() throws IOException {
@Test @Test
public void testCheckVersion() throws IOException { public void testCheckVersion() throws IOException {
// default version // default version
NMDBSchemaVersion defaultVersion = stateStore.getCurrentVersion(); Version defaultVersion = stateStore.getCurrentVersion();
Assert.assertEquals(defaultVersion, stateStore.loadVersion()); Assert.assertEquals(defaultVersion, stateStore.loadVersion());
// compatible version // compatible version
NMDBSchemaVersion compatibleVersion = Version compatibleVersion =
NMDBSchemaVersion.newInstance(defaultVersion.getMajorVersion(), Version.newInstance(defaultVersion.getMajorVersion(),
defaultVersion.getMinorVersion() + 2); defaultVersion.getMinorVersion() + 2);
stateStore.storeVersion(compatibleVersion); stateStore.storeVersion(compatibleVersion);
Assert.assertEquals(compatibleVersion, stateStore.loadVersion()); Assert.assertEquals(compatibleVersion, stateStore.loadVersion());
@ -128,8 +128,8 @@ public void testCheckVersion() throws IOException {
Assert.assertEquals(defaultVersion, stateStore.loadVersion()); Assert.assertEquals(defaultVersion, stateStore.loadVersion());
// incompatible version // incompatible version
NMDBSchemaVersion incompatibleVersion = Version incompatibleVersion =
NMDBSchemaVersion.newInstance(defaultVersion.getMajorVersion() + 1, Version.newInstance(defaultVersion.getMajorVersion() + 1,
defaultVersion.getMinorVersion()); defaultVersion.getMinorVersion());
stateStore.storeVersion(incompatibleVersion); stateStore.storeVersion(incompatibleVersion);
try { try {

View File

@ -44,22 +44,22 @@
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.AMRMTokenSecretManagerStateProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.AMRMTokenSecretManagerStateProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.EpochProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.EpochProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMStateVersionProto;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.Epoch; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.Epoch;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.AMRMTokenSecretManagerStatePBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.AMRMTokenSecretManagerStatePBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.EpochPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.EpochPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
@ -77,7 +77,7 @@ public class FileSystemRMStateStore extends RMStateStore {
public static final Log LOG = LogFactory.getLog(FileSystemRMStateStore.class); public static final Log LOG = LogFactory.getLog(FileSystemRMStateStore.class);
protected static final String ROOT_DIR_NAME = "FSRMStateRoot"; protected static final String ROOT_DIR_NAME = "FSRMStateRoot";
protected static final RMStateVersion CURRENT_VERSION_INFO = RMStateVersion protected static final Version CURRENT_VERSION_INFO = Version
.newInstance(1, 1); .newInstance(1, 1);
protected static final String AMRMTOKEN_SECRET_MANAGER_NODE = protected static final String AMRMTOKEN_SECRET_MANAGER_NODE =
"AMRMTokenSecretManagerNode"; "AMRMTokenSecretManagerNode";
@ -130,18 +130,18 @@ protected synchronized void closeInternal() throws Exception {
} }
@Override @Override
protected RMStateVersion getCurrentVersion() { protected Version getCurrentVersion() {
return CURRENT_VERSION_INFO; return CURRENT_VERSION_INFO;
} }
@Override @Override
protected synchronized RMStateVersion loadVersion() throws Exception { protected synchronized Version loadVersion() throws Exception {
Path versionNodePath = getNodePath(rootDirPath, VERSION_NODE); Path versionNodePath = getNodePath(rootDirPath, VERSION_NODE);
if (fs.exists(versionNodePath)) { if (fs.exists(versionNodePath)) {
FileStatus status = fs.getFileStatus(versionNodePath); FileStatus status = fs.getFileStatus(versionNodePath);
byte[] data = readFile(versionNodePath, status.getLen()); byte[] data = readFile(versionNodePath, status.getLen());
RMStateVersion version = Version version =
new RMStateVersionPBImpl(RMStateVersionProto.parseFrom(data)); new VersionPBImpl(VersionProto.parseFrom(data));
return version; return version;
} }
return null; return null;
@ -151,7 +151,7 @@ protected synchronized RMStateVersion loadVersion() throws Exception {
protected synchronized void storeVersion() throws Exception { protected synchronized void storeVersion() throws Exception {
Path versionNodePath = getNodePath(rootDirPath, VERSION_NODE); Path versionNodePath = getNodePath(rootDirPath, VERSION_NODE);
byte[] data = byte[] data =
((RMStateVersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray(); ((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray();
if (fs.exists(versionNodePath)) { if (fs.exists(versionNodePath)) {
updateFile(versionNodePath, data); updateFile(versionNodePath, data);
} else { } else {

View File

@ -32,10 +32,10 @@
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
@ -259,7 +259,7 @@ public synchronized void removeRMDTMasterKeyState(DelegationKey delegationKey)
} }
@Override @Override
protected RMStateVersion loadVersion() throws Exception { protected Version loadVersion() throws Exception {
return null; return null;
} }
@ -268,7 +268,7 @@ protected void storeVersion() throws Exception {
} }
@Override @Override
protected RMStateVersion getCurrentVersion() { protected Version getCurrentVersion() {
return null; return null;
} }

View File

@ -25,10 +25,10 @@
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
@Unstable @Unstable
public class NullRMStateStore extends RMStateStore { public class NullRMStateStore extends RMStateStore {
@ -123,7 +123,7 @@ public void checkVersion() throws Exception {
} }
@Override @Override
protected RMStateVersion loadVersion() throws Exception { protected Version loadVersion() throws Exception {
// Do nothing // Do nothing
return null; return null;
} }
@ -134,7 +134,7 @@ protected void storeVersion() throws Exception {
} }
@Override @Override
protected RMStateVersion getCurrentVersion() { protected Version getCurrentVersion() {
// Do nothing // Do nothing
return null; return null;
} }

View File

@ -47,12 +47,12 @@
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEvent; import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEvent;
import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEventType; import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEventType;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNewSavedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNewSavedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
@ -493,14 +493,14 @@ protected void serviceStop() throws Exception {
* upgrade RM state. * upgrade RM state.
*/ */
public void checkVersion() throws Exception { public void checkVersion() throws Exception {
RMStateVersion loadedVersion = loadVersion(); Version loadedVersion = loadVersion();
LOG.info("Loaded RM state version info " + loadedVersion); LOG.info("Loaded RM state version info " + loadedVersion);
if (loadedVersion != null && loadedVersion.equals(getCurrentVersion())) { if (loadedVersion != null && loadedVersion.equals(getCurrentVersion())) {
return; return;
} }
// if there is no version info, treat it as 1.0; // if there is no version info, treat it as 1.0;
if (loadedVersion == null) { if (loadedVersion == null) {
loadedVersion = RMStateVersion.newInstance(1, 0); loadedVersion = Version.newInstance(1, 0);
} }
if (loadedVersion.isCompatibleTo(getCurrentVersion())) { if (loadedVersion.isCompatibleTo(getCurrentVersion())) {
LOG.info("Storing RM state version info " + getCurrentVersion()); LOG.info("Storing RM state version info " + getCurrentVersion());
@ -516,7 +516,7 @@ public void checkVersion() throws Exception {
* Derived class use this method to load the version information from state * Derived class use this method to load the version information from state
* store. * store.
*/ */
protected abstract RMStateVersion loadVersion() throws Exception; protected abstract Version loadVersion() throws Exception;
/** /**
* Derived class use this method to store the version information. * Derived class use this method to store the version information.
@ -526,7 +526,7 @@ public void checkVersion() throws Exception {
/** /**
* Get the current version of the underlying state store. * Get the current version of the underlying state store.
*/ */
protected abstract RMStateVersion getCurrentVersion(); protected abstract Version getCurrentVersion();
/** /**

View File

@ -44,23 +44,23 @@
import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.AMRMTokenSecretManagerStateProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.AMRMTokenSecretManagerStateProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMStateVersionProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.EpochProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.EpochProto;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.resourcemanager.RMZKUtils; import org.apache.hadoop.yarn.server.resourcemanager.RMZKUtils;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.Epoch; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.Epoch;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.AMRMTokenSecretManagerStatePBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.AMRMTokenSecretManagerStatePBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.EpochPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.EpochPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
@ -86,7 +86,7 @@ public class ZKRMStateStore extends RMStateStore {
private final SecureRandom random = new SecureRandom(); private final SecureRandom random = new SecureRandom();
protected static final String ROOT_ZNODE_NAME = "ZKRMStateRoot"; protected static final String ROOT_ZNODE_NAME = "ZKRMStateRoot";
protected static final RMStateVersion CURRENT_VERSION_INFO = RMStateVersion protected static final Version CURRENT_VERSION_INFO = Version
.newInstance(1, 1); .newInstance(1, 1);
private static final String RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME = private static final String RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME =
"RMDelegationTokensRoot"; "RMDelegationTokensRoot";
@ -377,7 +377,7 @@ protected synchronized void closeInternal() throws Exception {
} }
@Override @Override
protected RMStateVersion getCurrentVersion() { protected Version getCurrentVersion() {
return CURRENT_VERSION_INFO; return CURRENT_VERSION_INFO;
} }
@ -385,7 +385,7 @@ protected RMStateVersion getCurrentVersion() {
protected synchronized void storeVersion() throws Exception { protected synchronized void storeVersion() throws Exception {
String versionNodePath = getNodePath(zkRootNodePath, VERSION_NODE); String versionNodePath = getNodePath(zkRootNodePath, VERSION_NODE);
byte[] data = byte[] data =
((RMStateVersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray(); ((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray();
if (existsWithRetries(versionNodePath, true) != null) { if (existsWithRetries(versionNodePath, true) != null) {
setDataWithRetries(versionNodePath, data, -1); setDataWithRetries(versionNodePath, data, -1);
} else { } else {
@ -394,13 +394,13 @@ protected synchronized void storeVersion() throws Exception {
} }
@Override @Override
protected synchronized RMStateVersion loadVersion() throws Exception { protected synchronized Version loadVersion() throws Exception {
String versionNodePath = getNodePath(zkRootNodePath, VERSION_NODE); String versionNodePath = getNodePath(zkRootNodePath, VERSION_NODE);
if (existsWithRetries(versionNodePath, true) != null) { if (existsWithRetries(versionNodePath, true) != null) {
byte[] data = getDataWithRetries(versionNodePath, true); byte[] data = getDataWithRetries(versionNodePath, true);
RMStateVersion version = Version version =
new RMStateVersionPBImpl(RMStateVersionProto.parseFrom(data)); new VersionPBImpl(VersionProto.parseFrom(data));
return version; return version;
} }
return null; return null;

View File

@ -1,80 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.recovery.records;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.util.Records;
/**
* The version information of RM state.
*/
@Private
@Unstable
public abstract class RMStateVersion {
public static RMStateVersion newInstance(int majorVersion, int minorVersion) {
RMStateVersion version = Records.newRecord(RMStateVersion.class);
version.setMajorVersion(majorVersion);
version.setMinorVersion(minorVersion);
return version;
}
public abstract int getMajorVersion();
public abstract void setMajorVersion(int majorVersion);
public abstract int getMinorVersion();
public abstract void setMinorVersion(int minorVersion);
public String toString() {
return getMajorVersion() + "." + getMinorVersion();
}
public boolean isCompatibleTo(RMStateVersion version) {
return getMajorVersion() == version.getMajorVersion();
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + getMajorVersion();
result = prime * result + getMinorVersion();
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
RMStateVersion other = (RMStateVersion) obj;
if (this.getMajorVersion() == other.getMajorVersion()
&& this.getMinorVersion() == other.getMinorVersion()) {
return true;
} else {
return false;
}
}
}

View File

@ -55,13 +55,13 @@
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMDTSecretManagerState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMDTSecretManagerState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@ -111,8 +111,8 @@ public EventHandler getEventHandler() {
interface RMStateStoreHelper { interface RMStateStoreHelper {
RMStateStore getRMStateStore() throws Exception; RMStateStore getRMStateStore() throws Exception;
boolean isFinalStateValid() throws Exception; boolean isFinalStateValid() throws Exception;
void writeVersion(RMStateVersion version) throws Exception; void writeVersion(Version version) throws Exception;
RMStateVersion getCurrentVersion() throws Exception; Version getCurrentVersion() throws Exception;
boolean appExists(RMApp app) throws Exception; boolean appExists(RMApp app) throws Exception;
} }
@ -477,13 +477,13 @@ public void testCheckVersion(RMStateStoreHelper stateStoreHelper)
store.setRMDispatcher(new TestDispatcher()); store.setRMDispatcher(new TestDispatcher());
// default version // default version
RMStateVersion defaultVersion = stateStoreHelper.getCurrentVersion(); Version defaultVersion = stateStoreHelper.getCurrentVersion();
store.checkVersion(); store.checkVersion();
Assert.assertEquals(defaultVersion, store.loadVersion()); Assert.assertEquals(defaultVersion, store.loadVersion());
// compatible version // compatible version
RMStateVersion compatibleVersion = Version compatibleVersion =
RMStateVersion.newInstance(defaultVersion.getMajorVersion(), Version.newInstance(defaultVersion.getMajorVersion(),
defaultVersion.getMinorVersion() + 2); defaultVersion.getMinorVersion() + 2);
stateStoreHelper.writeVersion(compatibleVersion); stateStoreHelper.writeVersion(compatibleVersion);
Assert.assertEquals(compatibleVersion, store.loadVersion()); Assert.assertEquals(compatibleVersion, store.loadVersion());
@ -492,8 +492,8 @@ public void testCheckVersion(RMStateStoreHelper stateStoreHelper)
Assert.assertEquals(defaultVersion, store.loadVersion()); Assert.assertEquals(defaultVersion, store.loadVersion());
// incompatible version // incompatible version
RMStateVersion incompatibleVersion = Version incompatibleVersion =
RMStateVersion.newInstance(defaultVersion.getMajorVersion() + 2, Version.newInstance(defaultVersion.getMajorVersion() + 2,
defaultVersion.getMinorVersion()); defaultVersion.getMinorVersion());
stateStoreHelper.writeVersion(incompatibleVersion); stateStoreHelper.writeVersion(incompatibleVersion);
try { try {

View File

@ -36,9 +36,9 @@
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
@ -70,7 +70,7 @@ public Path getVersionNode() {
return new Path(new Path(workingDirPathURI, ROOT_DIR_NAME), VERSION_NODE); return new Path(new Path(workingDirPathURI, ROOT_DIR_NAME), VERSION_NODE);
} }
public RMStateVersion getCurrentVersion() { public Version getCurrentVersion() {
return CURRENT_VERSION_INFO; return CURRENT_VERSION_INFO;
} }
@ -111,13 +111,13 @@ public boolean isFinalStateValid() throws Exception {
} }
@Override @Override
public void writeVersion(RMStateVersion version) throws Exception { public void writeVersion(Version version) throws Exception {
store.updateFile(store.getVersionNode(), ((RMStateVersionPBImpl) version) store.updateFile(store.getVersionNode(), ((VersionPBImpl) version)
.getProto().toByteArray()); .getProto().toByteArray());
} }
@Override @Override
public RMStateVersion getCurrentVersion() throws Exception { public Version getCurrentVersion() throws Exception {
return store.getCurrentVersion(); return store.getCurrentVersion();
} }

View File

@ -32,9 +32,9 @@
import org.apache.hadoop.service.Service; import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.data.Stat;
@ -69,7 +69,7 @@ public String getVersionNode() {
return znodeWorkingPath + "/" + ROOT_ZNODE_NAME + "/" + VERSION_NODE; return znodeWorkingPath + "/" + ROOT_ZNODE_NAME + "/" + VERSION_NODE;
} }
public RMStateVersion getCurrentVersion() { public Version getCurrentVersion() {
return CURRENT_VERSION_INFO; return CURRENT_VERSION_INFO;
} }
@ -96,13 +96,13 @@ public boolean isFinalStateValid() throws Exception {
} }
@Override @Override
public void writeVersion(RMStateVersion version) throws Exception { public void writeVersion(Version version) throws Exception {
client.setData(store.getVersionNode(), ((RMStateVersionPBImpl) version) client.setData(store.getVersionNode(), ((VersionPBImpl) version)
.getProto().toByteArray(), -1); .getProto().toByteArray(), -1);
} }
@Override @Override
public RMStateVersion getCurrentVersion() throws Exception { public Version getCurrentVersion() throws Exception {
return store.getCurrentVersion(); return store.getCurrentVersion();
} }