diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/exception/FederationStateVersionIncompatibleException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/exception/FederationStateVersionIncompatibleException.java new file mode 100644 index 0000000000..090c280773 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/exception/FederationStateVersionIncompatibleException.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.federation.store.exception; + +import org.apache.hadoop.yarn.exceptions.YarnException; + +public class FederationStateVersionIncompatibleException extends YarnException { + + private static final long serialVersionUID = 1L; + + public FederationStateVersionIncompatibleException(Throwable cause) { + super(cause); + } + + public FederationStateVersionIncompatibleException(String message) { + super(message); + } + + public FederationStateVersionIncompatibleException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java index 273e736e88..4aad86fbb1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java @@ -31,17 +31,18 @@ import java.util.stream.Collectors; import java.util.Comparator; -import org.apache.commons.lang3.NotImplementedException; import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; +import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateVersionIncompatibleException; import org.apache.hadoop.yarn.server.federation.store.metrics.FederationStateStoreClientMetrics; import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest; import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse; @@ -97,6 +98,7 @@ import org.apache.hadoop.yarn.server.federation.store.utils.FederationPolicyStoreInputValidator; import org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreUtils; import org.apache.hadoop.yarn.server.records.Version; +import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl; import org.apache.hadoop.yarn.util.MonotonicClock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -116,6 +118,9 @@ public class MemoryFederationStateStore implements FederationStateStore { private int maxAppsInStateStore; private AtomicInteger sequenceNum; private AtomicInteger masterKeyId; + private static final Version CURRENT_VERSION_INFO = Version + .newInstance(1, 1); + private byte[] version; private final MonotonicClock clock = new MonotonicClock(); @@ -134,6 +139,7 @@ public void init(Configuration conf) { YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_MAX_APPLICATIONS); sequenceNum = new AtomicInteger(); masterKeyId = new AtomicInteger(); + version = ((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray(); } @Override @@ -367,22 +373,43 @@ public GetSubClusterPoliciesConfigurationsResponse getPoliciesConfigurations( @Override public Version getCurrentVersion() { - throw new NotImplementedException("Code is not implemented"); + return CURRENT_VERSION_INFO; } @Override public Version loadVersion() throws Exception { - throw new NotImplementedException("Code is not implemented"); + if (version != null) { + VersionProto versionProto = VersionProto.parseFrom(version); + return new VersionPBImpl(versionProto); + } + return null; } @Override public void storeVersion() throws Exception { - throw new NotImplementedException("Code is not implemented"); + version = ((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray(); } @Override public void checkVersion() throws Exception { - throw new NotImplementedException("Code is not implemented"); + Version loadedVersion = loadVersion(); + LOG.info("Loaded Router State Version Info = {}.", loadedVersion); + Version currentVersion = getCurrentVersion(); + if (loadedVersion != null && loadedVersion.equals(currentVersion)) { + return; + } + // if there is no version info, treat it as CURRENT_VERSION_INFO; + if (loadedVersion == null) { + loadedVersion = currentVersion; + } + if (loadedVersion.isCompatibleTo(currentVersion)) { + LOG.info("Storing Router State Version Info {}.", currentVersion); + storeVersion(); + } else { + throw new FederationStateVersionIncompatibleException( + "Expecting Router state version " + currentVersion + + ", but loading version " + loadedVersion); + } } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java index 5548dab1b8..bb7e130b5e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java @@ -27,6 +27,8 @@ import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKey; import org.apache.hadoop.yarn.server.federation.store.records.RouterRMDTSecretManagerState; import org.apache.hadoop.yarn.server.federation.store.records.RouterStoreToken; +import org.apache.hadoop.yarn.server.records.Version; +import org.junit.Test; import java.io.IOException; import java.nio.ByteBuffer; @@ -88,4 +90,39 @@ protected void checkRouterStoreToken(RMDelegationTokenIdentifier identifier, assertTrue(tokenIdentifier instanceof RMDelegationTokenIdentifier); assertEquals(identifier, tokenIdentifier); } + + @Test + public void testGetCurrentVersion() { + MemoryFederationStateStore memoryStateStore = + MemoryFederationStateStore.class.cast(this.getStateStore()); + Version version = memoryStateStore.getCurrentVersion(); + assertEquals(version.getMajorVersion(), 1); + assertEquals(version.getMinorVersion(), 1); + } + + @Test + public void testStoreVersion() throws Exception { + MemoryFederationStateStore memoryStateStore = + MemoryFederationStateStore.class.cast(this.getStateStore()); + memoryStateStore.storeVersion(); + Version version = memoryStateStore.getCurrentVersion(); + assertEquals(version.getMajorVersion(), 1); + assertEquals(version.getMinorVersion(), 1); + } + + @Test + public void testLoadVersion() throws Exception { + MemoryFederationStateStore memoryStateStore = + MemoryFederationStateStore.class.cast(this.getStateStore()); + Version version = memoryStateStore.loadVersion(); + assertEquals(version.getMajorVersion(), 1); + assertEquals(version.getMinorVersion(), 1); + } + + @Test + public void testCheckVersion() throws Exception { + MemoryFederationStateStore memoryStateStore = + MemoryFederationStateStore.class.cast(this.getStateStore()); + memoryStateStore.checkVersion(); + } } \ No newline at end of file