From 6369c8d81972a9a0b6ef41f4508fcb60d34e3d78 Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Wed, 27 Nov 2013 23:22:33 +0000 Subject: [PATCH] YARN-1239. Modified ResourceManager state-store implementations to start storing version numbers. Contributed by Jian He. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1546229 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 + ...erver_resourcemanager_service_protos.proto | 11 ++- .../resourcemanager/ResourceManager.java | 1 + .../recovery/FileSystemRMStateStore.java | 40 +++++++++- .../recovery/MemoryRMStateStore.java | 22 ++++- .../recovery/NullRMStateStore.java | 22 +++++ .../recovery/RMStateStore.java | 54 ++++++++++++- .../RMStateVersionIncompatibleException.java | 42 ++++++++++ .../recovery/ZKRMStateStore.java | 41 +++++++++- .../recovery/records/RMStateVersion.java | 80 +++++++++++++++++++ .../records/impl/pb/RMStateVersionPBImpl.java | 76 ++++++++++++++++++ .../recovery/RMStateStoreTestBase.java | 36 +++++++++ .../recovery/TestFSRMStateStore.java | 24 +++++- .../recovery/TestZKRMStateStore.java | 24 +++++- 14 files changed, 459 insertions(+), 17 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateVersionIncompatibleException.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/RMStateVersion.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/RMStateVersionPBImpl.java diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 20cf15bda4..eab6fcf5eb 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -123,6 +123,9 @@ Release 2.3.0 - UNRELEASED YARN-1314. Fixed DistributedShell to not fail with multiple arguments for a shell command separated by spaces. (Xuan Gong via vinodkv) + YARN-1239. Modified ResourceManager state-store implementations to start + storing version numbers. (Jian He via vinodkv) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto index 6fc8232209..df77486ca6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto @@ -103,9 +103,9 @@ enum RMAppStateProto { message ApplicationStateDataProto { optional int64 submit_time = 1; - optional int64 start_time = 2; - optional ApplicationSubmissionContextProto application_submission_context = 3; - optional string user = 4; + optional ApplicationSubmissionContextProto application_submission_context = 2; + optional string user = 3; + optional int64 start_time = 4; optional RMAppStateProto application_state = 5; optional string diagnostics = 6 [default = "N/A"]; optional int64 finish_time = 7; @@ -121,3 +121,8 @@ message ApplicationAttemptStateDataProto { optional int64 start_time = 7; optional FinalApplicationStatusProto final_application_status = 8; } + +message RMStateVersionProto { + optional int32 major_version = 1; + optional int32 minor_version = 2; +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 82a1f64973..3c187f90e0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -457,6 +457,7 @@ protected void serviceStart() throws Exception { if(recoveryEnabled) { try { + rmStore.checkVersion(); RMState state = rmStore.loadState(); recover(state); } catch (Exception e) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java index 46a58fc96a..2ef6bcd62c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java @@ -44,9 +44,12 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMStateVersionProto; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl; import org.apache.hadoop.yarn.util.ConverterUtils; import com.google.common.annotations.VisibleForTesting; @@ -63,7 +66,9 @@ public class FileSystemRMStateStore extends RMStateStore { public static final Log LOG = LogFactory.getLog(FileSystemRMStateStore.class); - private static final String ROOT_DIR_NAME = "FSRMStateRoot"; + protected static final String ROOT_DIR_NAME = "FSRMStateRoot"; + protected static final RMStateVersion CURRENT_VERSION_INFO = RMStateVersion + .newInstance(1, 0); protected FileSystem fs; @@ -78,7 +83,6 @@ public class FileSystemRMStateStore extends RMStateStore { @Override public synchronized void initInternal(Configuration conf) throws Exception{ - fsWorkingPath = new Path(conf.get(YarnConfiguration.FS_RM_STATE_STORE_URI)); rootDirPath = new Path(fsWorkingPath, ROOT_DIR_NAME); rmDTSecretManagerRoot = new Path(rootDirPath, RM_DT_SECRET_MANAGER_ROOT); @@ -100,6 +104,36 @@ protected synchronized void closeInternal() throws Exception { fs.close(); } + @Override + protected RMStateVersion getCurrentVersion() { + return CURRENT_VERSION_INFO; + } + + @Override + protected synchronized RMStateVersion loadVersion() throws Exception { + Path versionNodePath = getNodePath(rootDirPath, VERSION_NODE); + if (fs.exists(versionNodePath)) { + FileStatus status = fs.getFileStatus(versionNodePath); + byte[] data = readFile(versionNodePath, status.getLen()); + RMStateVersion version = + new RMStateVersionPBImpl(RMStateVersionProto.parseFrom(data)); + return version; + } + return null; + } + + @Override + protected synchronized void storeVersion() throws Exception { + Path versionNodePath = getNodePath(rootDirPath, VERSION_NODE); + byte[] data = + ((RMStateVersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray(); + if (fs.exists(versionNodePath)) { + updateFile(versionNodePath, data); + } else { + writeFile(versionNodePath, data); + } + } + @Override public synchronized RMState loadState() throws Exception { RMState rmState = new RMState(); @@ -430,7 +464,7 @@ private void writeFile(Path outputPath, byte[] data) throws Exception { fs.rename(tempPath, outputPath); } - private void updateFile(Path outputPath, byte[] data) throws Exception { + protected void updateFile(Path outputPath, byte[] data) throws Exception { if (fs.exists(outputPath)) { deleteFile(outputPath); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java index 495c292dd1..d5ff5ededd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java @@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.util.ConverterUtils; @@ -43,12 +44,15 @@ public class MemoryRMStateStore extends RMStateStore { RMState state = new RMState(); - @VisibleForTesting public RMState getState() { return state; } - + + @Override + public void checkVersion() throws Exception { + } + @Override public synchronized RMState loadState() throws Exception { // return a copy of the state to allow for modification of the real state @@ -224,4 +228,18 @@ public synchronized void removeRMDTMasterKeyState(DelegationKey delegationKey) state.rmSecretManagerState.getMasterKeyState(); rmDTMasterKeyState.remove(delegationKey); } + + @Override + protected RMStateVersion loadVersion() throws Exception { + return null; + } + + @Override + protected void storeVersion() throws Exception { + } + + @Override + protected RMStateVersion getCurrentVersion() { + return null; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java index c8ad1c42ca..c212c1fe85 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java @@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; @@ -99,6 +100,27 @@ protected void updateApplicationStateInternal(String appId, @Override protected void updateApplicationAttemptStateInternal(String attemptId, ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception { + } + + @Override + public void checkVersion() throws Exception { // Do nothing } + + @Override + protected RMStateVersion loadVersion() throws Exception { + // Do nothing + return null; + } + + @Override + protected void storeVersion() throws Exception { + // Do nothing + } + + @Override + protected RMStateVersion getCurrentVersion() { + // Do nothing + return null; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index b9724d210b..5e0e94429c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -43,18 +43,18 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; - import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNewSavedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRemovedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNewSavedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppUpdateSavedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; @@ -78,6 +78,7 @@ public abstract class RMStateStore extends AbstractService { protected static final String DELEGATION_TOKEN_PREFIX = "RMDelegationToken_"; protected static final String DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX = "RMDTSequenceNumber_"; + protected static final String VERSION_NODE = "RMVersionNode"; public static final Log LOG = LogFactory.getLog(RMStateStore.class); @@ -304,7 +305,54 @@ protected void serviceStop() throws Exception { * after this */ protected abstract void closeInternal() throws Exception; - + + /** + * 1) Versioning scheme: major.minor. For e.g. 1.0, 1.1, 1.2...1.25, 2.0 etc. + * 2) Any incompatible change of state-store is a major upgrade, and any + * compatible change of state-store is a minor upgrade. + * 3) If theres's no version, treat it as 1.0. + * 4) Within a minor upgrade, say 1.1 to 1.2: + * overwrite the version info and proceed as normal. + * 5) Within a major upgrade, say 1.2 to 2.0: + * throw exception and indicate user to use a separate upgrade tool to + * upgrade RM state. + */ + public void checkVersion() throws Exception { + RMStateVersion loadedVersion = loadVersion(); + LOG.info("Loaded RM state version info " + loadedVersion); + if (loadedVersion != null && loadedVersion.equals(getCurrentVersion())) { + return; + } + // if there is no version info, treat it as 1.0; + if (loadedVersion == null) { + loadedVersion = RMStateVersion.newInstance(1, 0); + } + if (loadedVersion.isCompatibleTo(getCurrentVersion())) { + LOG.info("Storing RM state version info " + getCurrentVersion()); + storeVersion(); + } else { + throw new RMStateVersionIncompatibleException( + "Expecting RM state version " + getCurrentVersion() + + ", but loading version " + loadedVersion); + } + } + + /** + * Derived class use this method to load the version information from state + * store. + */ + protected abstract RMStateVersion loadVersion() throws Exception; + + /** + * Derived class use this method to store the version information. + */ + protected abstract void storeVersion() throws Exception; + + /** + * Get the current version of the underlying state store. + */ + protected abstract RMStateVersion getCurrentVersion(); + /** * Blocking API * The derived class must recover state from the store and return a new diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateVersionIncompatibleException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateVersionIncompatibleException.java new file mode 100644 index 0000000000..135868f1a5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateVersionIncompatibleException.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.recovery; + +import org.apache.hadoop.yarn.exceptions.YarnException; + +/** + * This exception is thrown by ResourceManager if it's loading an incompatible + * version of state from state store on recovery. + */ +public class RMStateVersionIncompatibleException extends YarnException { + + private static final long serialVersionUID = 1364408L; + + public RMStateVersionIncompatibleException(Throwable cause) { + super(cause); + } + + public RMStateVersionIncompatibleException(String message) { + super(message); + } + + public RMStateVersionIncompatibleException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java index 45afb4e5d9..1621d8327a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java @@ -33,7 +33,6 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.io.DataInputByteBuffer; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.delegation.DelegationKey; @@ -41,16 +40,18 @@ import org.apache.hadoop.util.ZKUtil; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.client.RMHAServiceTarget; import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMStateVersionProto; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; @@ -64,9 +65,9 @@ import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Id; import org.apache.zookeeper.data.Stat; +import org.apache.zookeeper.server.auth.DigestAuthenticationProvider; import com.google.common.annotations.VisibleForTesting; -import org.apache.zookeeper.server.auth.DigestAuthenticationProvider; @Private @Unstable @@ -74,7 +75,9 @@ public class ZKRMStateStore extends RMStateStore { public static final Log LOG = LogFactory.getLog(ZKRMStateStore.class); - private static final String ROOT_ZNODE_NAME = "ZKRMStateRoot"; + protected static final String ROOT_ZNODE_NAME = "ZKRMStateRoot"; + protected static final RMStateVersion CURRENT_VERSION_INFO = RMStateVersion + .newInstance(1, 0); private int numRetries; private String zkHostPort = null; @@ -301,6 +304,36 @@ protected synchronized void closeInternal() throws Exception { closeZkClients(); } + @Override + protected RMStateVersion getCurrentVersion() { + return CURRENT_VERSION_INFO; + } + + @Override + protected synchronized void storeVersion() throws Exception { + String versionNodePath = getNodePath(zkRootNodePath, VERSION_NODE); + byte[] data = + ((RMStateVersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray(); + if (zkClient.exists(versionNodePath, true) != null) { + setDataWithRetries(versionNodePath, data, -1); + } else { + createWithRetries(versionNodePath, data, zkAcl, CreateMode.PERSISTENT); + } + } + + @Override + protected synchronized RMStateVersion loadVersion() throws Exception { + String versionNodePath = getNodePath(zkRootNodePath, VERSION_NODE); + + if (zkClient.exists(versionNodePath, true) != null) { + byte[] data = getDataWithRetries(versionNodePath, true); + RMStateVersion version = + new RMStateVersionPBImpl(RMStateVersionProto.parseFrom(data)); + return version; + } + return null; + } + @Override public synchronized RMState loadState() throws Exception { RMState rmState = new RMState(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/RMStateVersion.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/RMStateVersion.java new file mode 100644 index 0000000000..cfee512b5d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/RMStateVersion.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.recovery.records; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.util.Records; + +/** + * The version information of RM state. + */ +@Private +@Unstable +public abstract class RMStateVersion { + + public static RMStateVersion newInstance(int majorVersion, int minorVersion) { + RMStateVersion version = Records.newRecord(RMStateVersion.class); + version.setMajorVersion(majorVersion); + version.setMinorVersion(minorVersion); + return version; + } + + public abstract int getMajorVersion(); + + public abstract void setMajorVersion(int majorVersion); + + public abstract int getMinorVersion(); + + public abstract void setMinorVersion(int minorVersion); + + public String toString() { + return getMajorVersion() + "." + getMinorVersion(); + } + + public boolean isCompatibleTo(RMStateVersion version) { + return getMajorVersion() == version.getMajorVersion(); + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + getMajorVersion(); + result = prime * result + getMinorVersion(); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + RMStateVersion other = (RMStateVersion) obj; + if (this.getMajorVersion() == other.getMajorVersion() + && this.getMinorVersion() == other.getMinorVersion()) { + return true; + } else { + return false; + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/RMStateVersionPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/RMStateVersionPBImpl.java new file mode 100644 index 0000000000..f960413ce6 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/RMStateVersionPBImpl.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb; + +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMStateVersionProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMStateVersionProtoOrBuilder; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion; + +public class RMStateVersionPBImpl extends RMStateVersion { + + RMStateVersionProto proto = RMStateVersionProto.getDefaultInstance(); + RMStateVersionProto.Builder builder = null; + boolean viaProto = false; + + public RMStateVersionPBImpl() { + builder = RMStateVersionProto.newBuilder(); + } + + public RMStateVersionPBImpl(RMStateVersionProto proto) { + this.proto = proto; + viaProto = true; + } + + public RMStateVersionProto getProto() { + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = RMStateVersionProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public int getMajorVersion() { + RMStateVersionProtoOrBuilder p = viaProto ? proto : builder; + return p.getMajorVersion(); + } + + @Override + public void setMajorVersion(int major) { + maybeInitBuilder(); + builder.setMajorVersion(major); + } + + @Override + public int getMinorVersion() { + RMStateVersionProtoOrBuilder p = viaProto ? proto : builder; + return p.getMinorVersion(); + } + + @Override + public void setMinorVersion(int minor) { + maybeInitBuilder(); + builder.setMinorVersion(minor); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java index 95c14bfbf6..417fdb147c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java @@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMDTSecretManagerState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; @@ -106,6 +107,8 @@ public EventHandler getEventHandler() { interface RMStateStoreHelper { RMStateStore getRMStateStore() throws Exception; boolean isFinalStateValid() throws Exception; + void writeVersion(RMStateVersion version) throws Exception; + RMStateVersion getCurrentVersion() throws Exception; } void waitNotify(TestDispatcher dispatcher) { @@ -379,4 +382,37 @@ private Token generateAMRMToken( appToken.setService(new Text("appToken service")); return appToken; } + + public void testCheckVersion(RMStateStoreHelper stateStoreHelper) + throws Exception { + RMStateStore store = stateStoreHelper.getRMStateStore(); + store.setRMDispatcher(new TestDispatcher()); + + // default version + RMStateVersion defaultVersion = stateStoreHelper.getCurrentVersion(); + store.checkVersion(); + Assert.assertEquals(defaultVersion, store.loadVersion()); + + // compatible version + RMStateVersion compatibleVersion = + RMStateVersion.newInstance(defaultVersion.getMajorVersion(), + defaultVersion.getMinorVersion() + 2); + stateStoreHelper.writeVersion(compatibleVersion); + Assert.assertEquals(compatibleVersion, store.loadVersion()); + store.checkVersion(); + // overwrite the compatible version + Assert.assertEquals(defaultVersion, store.loadVersion()); + + // incompatible version + RMStateVersion incompatibleVersion = + RMStateVersion.newInstance(defaultVersion.getMajorVersion() + 2, + defaultVersion.getMinorVersion()); + stateStoreHelper.writeVersion(incompatibleVersion); + try { + store.checkVersion(); + Assert.fail("Invalid version, should fail."); + } catch (Throwable t) { + Assert.assertTrue(t instanceof RMStateVersionIncompatibleException); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java index a1a6eab3fd..63fe97557c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java @@ -32,6 +32,8 @@ import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl; import org.apache.hadoop.yarn.util.ConverterUtils; import org.junit.Test; @@ -42,7 +44,7 @@ public class TestFSRMStateStore extends RMStateStoreTestBase { class TestFSRMStateStoreTester implements RMStateStoreHelper { Path workingDirPathURI; - FileSystemRMStateStore store; + TestFileSystemRMStore store; MiniDFSCluster cluster; class TestFileSystemRMStore extends FileSystemRMStateStore { @@ -54,6 +56,14 @@ class TestFileSystemRMStore extends FileSystemRMStateStore { start(); Assert.assertNotNull(fs); } + + public Path getVersionNode() { + return new Path(new Path(workingDirPathURI, ROOT_DIR_NAME), VERSION_NODE); + } + + public RMStateVersion getCurrentVersion() { + return CURRENT_VERSION_INFO; + } } public TestFSRMStateStoreTester(MiniDFSCluster cluster) throws Exception { @@ -81,6 +91,17 @@ public boolean isFinalStateValid() throws Exception { FileStatus[] files = fs.listStatus(workingDirPathURI); return files.length == 1; } + + @Override + public void writeVersion(RMStateVersion version) throws Exception { + store.updateFile(store.getVersionNode(), ((RMStateVersionPBImpl) version) + .getProto().toByteArray()); + } + + @Override + public RMStateVersion getCurrentVersion() throws Exception { + return store.getCurrentVersion(); + } } @Test @@ -113,6 +134,7 @@ public void testFSRMStateStore() throws Exception { Assert.assertFalse(fileSystemRMStateStore.fsWorkingPath .getFileSystem(conf).exists(tempAppAttemptFile)); testRMDTSecretManagerStateStore(fsTester); + testCheckVersion(fsTester); } finally { cluster.shutdown(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java index 4138cfaec1..eceeecc685 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java @@ -44,6 +44,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl; import org.apache.zookeeper.ZooKeeper; import org.junit.Test; @@ -54,7 +56,7 @@ public class TestZKRMStateStore extends RMStateStoreTestBase { class TestZKRMStateStoreTester implements RMStateStoreHelper { ZooKeeper client; - ZKRMStateStore store; + TestZKRMStateStoreInternal store; class TestZKRMStateStoreInternal extends ZKRMStateStore { @@ -69,6 +71,14 @@ public TestZKRMStateStoreInternal(Configuration conf, String workingZnode) public ZooKeeper getNewZooKeeper() throws IOException { return client; } + + public String getVersionNode() { + return znodeWorkingPath + "/" + ROOT_ZNODE_NAME + "/" + VERSION_NODE; + } + + public RMStateVersion getCurrentVersion() { + return CURRENT_VERSION_INFO; + } } public RMStateStore getRMStateStore() throws Exception { @@ -86,6 +96,17 @@ public boolean isFinalStateValid() throws Exception { List nodes = client.getChildren(store.znodeWorkingPath, false); return nodes.size() == 1; } + + @Override + public void writeVersion(RMStateVersion version) throws Exception { + client.setData(store.getVersionNode(), ((RMStateVersionPBImpl) version) + .getProto().toByteArray(), -1); + } + + @Override + public RMStateVersion getCurrentVersion() throws Exception { + return store.getCurrentVersion(); + } } @Test @@ -93,6 +114,7 @@ public void testZKRMStateStoreRealZK() throws Exception { TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester(); testRMAppStateStore(zkTester); testRMDTSecretManagerStateStore(zkTester); + testCheckVersion(zkTester); } private Configuration createHARMConf(