YARN-1239. Modified ResourceManager state-store implementations to start storing version numbers. Contributed by Jian He.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1546229 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
ec5c8a9865
commit
6369c8d819
@ -123,6 +123,9 @@ Release 2.3.0 - UNRELEASED
|
|||||||
YARN-1314. Fixed DistributedShell to not fail with multiple arguments for a
|
YARN-1314. Fixed DistributedShell to not fail with multiple arguments for a
|
||||||
shell command separated by spaces. (Xuan Gong via vinodkv)
|
shell command separated by spaces. (Xuan Gong via vinodkv)
|
||||||
|
|
||||||
|
YARN-1239. Modified ResourceManager state-store implementations to start
|
||||||
|
storing version numbers. (Jian He via vinodkv)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
@ -103,9 +103,9 @@ enum RMAppStateProto {
|
|||||||
|
|
||||||
message ApplicationStateDataProto {
|
message ApplicationStateDataProto {
|
||||||
optional int64 submit_time = 1;
|
optional int64 submit_time = 1;
|
||||||
optional int64 start_time = 2;
|
optional ApplicationSubmissionContextProto application_submission_context = 2;
|
||||||
optional ApplicationSubmissionContextProto application_submission_context = 3;
|
optional string user = 3;
|
||||||
optional string user = 4;
|
optional int64 start_time = 4;
|
||||||
optional RMAppStateProto application_state = 5;
|
optional RMAppStateProto application_state = 5;
|
||||||
optional string diagnostics = 6 [default = "N/A"];
|
optional string diagnostics = 6 [default = "N/A"];
|
||||||
optional int64 finish_time = 7;
|
optional int64 finish_time = 7;
|
||||||
@ -121,3 +121,8 @@ message ApplicationAttemptStateDataProto {
|
|||||||
optional int64 start_time = 7;
|
optional int64 start_time = 7;
|
||||||
optional FinalApplicationStatusProto final_application_status = 8;
|
optional FinalApplicationStatusProto final_application_status = 8;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
message RMStateVersionProto {
|
||||||
|
optional int32 major_version = 1;
|
||||||
|
optional int32 minor_version = 2;
|
||||||
|
}
|
||||||
|
@ -457,6 +457,7 @@ protected void serviceStart() throws Exception {
|
|||||||
|
|
||||||
if(recoveryEnabled) {
|
if(recoveryEnabled) {
|
||||||
try {
|
try {
|
||||||
|
rmStore.checkVersion();
|
||||||
RMState state = rmStore.loadState();
|
RMState state = rmStore.loadState();
|
||||||
recover(state);
|
recover(state);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
@ -44,9 +44,12 @@
|
|||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto;
|
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto;
|
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto;
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMStateVersionProto;
|
||||||
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl;
|
||||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
@ -63,7 +66,9 @@ public class FileSystemRMStateStore extends RMStateStore {
|
|||||||
|
|
||||||
public static final Log LOG = LogFactory.getLog(FileSystemRMStateStore.class);
|
public static final Log LOG = LogFactory.getLog(FileSystemRMStateStore.class);
|
||||||
|
|
||||||
private static final String ROOT_DIR_NAME = "FSRMStateRoot";
|
protected static final String ROOT_DIR_NAME = "FSRMStateRoot";
|
||||||
|
protected static final RMStateVersion CURRENT_VERSION_INFO = RMStateVersion
|
||||||
|
.newInstance(1, 0);
|
||||||
|
|
||||||
protected FileSystem fs;
|
protected FileSystem fs;
|
||||||
|
|
||||||
@ -78,7 +83,6 @@ public class FileSystemRMStateStore extends RMStateStore {
|
|||||||
@Override
|
@Override
|
||||||
public synchronized void initInternal(Configuration conf)
|
public synchronized void initInternal(Configuration conf)
|
||||||
throws Exception{
|
throws Exception{
|
||||||
|
|
||||||
fsWorkingPath = new Path(conf.get(YarnConfiguration.FS_RM_STATE_STORE_URI));
|
fsWorkingPath = new Path(conf.get(YarnConfiguration.FS_RM_STATE_STORE_URI));
|
||||||
rootDirPath = new Path(fsWorkingPath, ROOT_DIR_NAME);
|
rootDirPath = new Path(fsWorkingPath, ROOT_DIR_NAME);
|
||||||
rmDTSecretManagerRoot = new Path(rootDirPath, RM_DT_SECRET_MANAGER_ROOT);
|
rmDTSecretManagerRoot = new Path(rootDirPath, RM_DT_SECRET_MANAGER_ROOT);
|
||||||
@ -100,6 +104,36 @@ protected synchronized void closeInternal() throws Exception {
|
|||||||
fs.close();
|
fs.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected RMStateVersion getCurrentVersion() {
|
||||||
|
return CURRENT_VERSION_INFO;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected synchronized RMStateVersion loadVersion() throws Exception {
|
||||||
|
Path versionNodePath = getNodePath(rootDirPath, VERSION_NODE);
|
||||||
|
if (fs.exists(versionNodePath)) {
|
||||||
|
FileStatus status = fs.getFileStatus(versionNodePath);
|
||||||
|
byte[] data = readFile(versionNodePath, status.getLen());
|
||||||
|
RMStateVersion version =
|
||||||
|
new RMStateVersionPBImpl(RMStateVersionProto.parseFrom(data));
|
||||||
|
return version;
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected synchronized void storeVersion() throws Exception {
|
||||||
|
Path versionNodePath = getNodePath(rootDirPath, VERSION_NODE);
|
||||||
|
byte[] data =
|
||||||
|
((RMStateVersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray();
|
||||||
|
if (fs.exists(versionNodePath)) {
|
||||||
|
updateFile(versionNodePath, data);
|
||||||
|
} else {
|
||||||
|
writeFile(versionNodePath, data);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized RMState loadState() throws Exception {
|
public synchronized RMState loadState() throws Exception {
|
||||||
RMState rmState = new RMState();
|
RMState rmState = new RMState();
|
||||||
@ -430,7 +464,7 @@ private void writeFile(Path outputPath, byte[] data) throws Exception {
|
|||||||
fs.rename(tempPath, outputPath);
|
fs.rename(tempPath, outputPath);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void updateFile(Path outputPath, byte[] data) throws Exception {
|
protected void updateFile(Path outputPath, byte[] data) throws Exception {
|
||||||
if (fs.exists(outputPath)) {
|
if (fs.exists(outputPath)) {
|
||||||
deleteFile(outputPath);
|
deleteFile(outputPath);
|
||||||
}
|
}
|
||||||
|
@ -32,6 +32,7 @@
|
|||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
|
||||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||||
@ -43,12 +44,15 @@
|
|||||||
public class MemoryRMStateStore extends RMStateStore {
|
public class MemoryRMStateStore extends RMStateStore {
|
||||||
|
|
||||||
RMState state = new RMState();
|
RMState state = new RMState();
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public RMState getState() {
|
public RMState getState() {
|
||||||
return state;
|
return state;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void checkVersion() throws Exception {
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized RMState loadState() throws Exception {
|
public synchronized RMState loadState() throws Exception {
|
||||||
// return a copy of the state to allow for modification of the real state
|
// return a copy of the state to allow for modification of the real state
|
||||||
@ -224,4 +228,18 @@ public synchronized void removeRMDTMasterKeyState(DelegationKey delegationKey)
|
|||||||
state.rmSecretManagerState.getMasterKeyState();
|
state.rmSecretManagerState.getMasterKeyState();
|
||||||
rmDTMasterKeyState.remove(delegationKey);
|
rmDTMasterKeyState.remove(delegationKey);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected RMStateVersion loadVersion() throws Exception {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void storeVersion() throws Exception {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected RMStateVersion getCurrentVersion() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -23,6 +23,7 @@
|
|||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.security.token.delegation.DelegationKey;
|
import org.apache.hadoop.security.token.delegation.DelegationKey;
|
||||||
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
|
||||||
|
|
||||||
@ -99,6 +100,27 @@ protected void updateApplicationStateInternal(String appId,
|
|||||||
@Override
|
@Override
|
||||||
protected void updateApplicationAttemptStateInternal(String attemptId,
|
protected void updateApplicationAttemptStateInternal(String attemptId,
|
||||||
ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception {
|
ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void checkVersion() throws Exception {
|
||||||
// Do nothing
|
// Do nothing
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected RMStateVersion loadVersion() throws Exception {
|
||||||
|
// Do nothing
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void storeVersion() throws Exception {
|
||||||
|
// Do nothing
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected RMStateVersion getCurrentVersion() {
|
||||||
|
// Do nothing
|
||||||
|
return null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -43,18 +43,18 @@
|
|||||||
import org.apache.hadoop.yarn.api.records.Container;
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
|
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
||||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNewSavedEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRemovedEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRemovedEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNewSavedEvent;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppUpdateSavedEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppUpdateSavedEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||||
@ -78,6 +78,7 @@ public abstract class RMStateStore extends AbstractService {
|
|||||||
protected static final String DELEGATION_TOKEN_PREFIX = "RMDelegationToken_";
|
protected static final String DELEGATION_TOKEN_PREFIX = "RMDelegationToken_";
|
||||||
protected static final String DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX =
|
protected static final String DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX =
|
||||||
"RMDTSequenceNumber_";
|
"RMDTSequenceNumber_";
|
||||||
|
protected static final String VERSION_NODE = "RMVersionNode";
|
||||||
|
|
||||||
public static final Log LOG = LogFactory.getLog(RMStateStore.class);
|
public static final Log LOG = LogFactory.getLog(RMStateStore.class);
|
||||||
|
|
||||||
@ -304,7 +305,54 @@ protected void serviceStop() throws Exception {
|
|||||||
* after this
|
* after this
|
||||||
*/
|
*/
|
||||||
protected abstract void closeInternal() throws Exception;
|
protected abstract void closeInternal() throws Exception;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 1) Versioning scheme: major.minor. For e.g. 1.0, 1.1, 1.2...1.25, 2.0 etc.
|
||||||
|
* 2) Any incompatible change of state-store is a major upgrade, and any
|
||||||
|
* compatible change of state-store is a minor upgrade.
|
||||||
|
* 3) If theres's no version, treat it as 1.0.
|
||||||
|
* 4) Within a minor upgrade, say 1.1 to 1.2:
|
||||||
|
* overwrite the version info and proceed as normal.
|
||||||
|
* 5) Within a major upgrade, say 1.2 to 2.0:
|
||||||
|
* throw exception and indicate user to use a separate upgrade tool to
|
||||||
|
* upgrade RM state.
|
||||||
|
*/
|
||||||
|
public void checkVersion() throws Exception {
|
||||||
|
RMStateVersion loadedVersion = loadVersion();
|
||||||
|
LOG.info("Loaded RM state version info " + loadedVersion);
|
||||||
|
if (loadedVersion != null && loadedVersion.equals(getCurrentVersion())) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// if there is no version info, treat it as 1.0;
|
||||||
|
if (loadedVersion == null) {
|
||||||
|
loadedVersion = RMStateVersion.newInstance(1, 0);
|
||||||
|
}
|
||||||
|
if (loadedVersion.isCompatibleTo(getCurrentVersion())) {
|
||||||
|
LOG.info("Storing RM state version info " + getCurrentVersion());
|
||||||
|
storeVersion();
|
||||||
|
} else {
|
||||||
|
throw new RMStateVersionIncompatibleException(
|
||||||
|
"Expecting RM state version " + getCurrentVersion()
|
||||||
|
+ ", but loading version " + loadedVersion);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Derived class use this method to load the version information from state
|
||||||
|
* store.
|
||||||
|
*/
|
||||||
|
protected abstract RMStateVersion loadVersion() throws Exception;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Derived class use this method to store the version information.
|
||||||
|
*/
|
||||||
|
protected abstract void storeVersion() throws Exception;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the current version of the underlying state store.
|
||||||
|
*/
|
||||||
|
protected abstract RMStateVersion getCurrentVersion();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Blocking API
|
* Blocking API
|
||||||
* The derived class must recover state from the store and return a new
|
* The derived class must recover state from the store and return a new
|
||||||
|
@ -0,0 +1,42 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This exception is thrown by ResourceManager if it's loading an incompatible
|
||||||
|
* version of state from state store on recovery.
|
||||||
|
*/
|
||||||
|
public class RMStateVersionIncompatibleException extends YarnException {
|
||||||
|
|
||||||
|
private static final long serialVersionUID = 1364408L;
|
||||||
|
|
||||||
|
public RMStateVersionIncompatibleException(Throwable cause) {
|
||||||
|
super(cause);
|
||||||
|
}
|
||||||
|
|
||||||
|
public RMStateVersionIncompatibleException(String message) {
|
||||||
|
super(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
public RMStateVersionIncompatibleException(String message, Throwable cause) {
|
||||||
|
super(message, cause);
|
||||||
|
}
|
||||||
|
}
|
@ -33,7 +33,6 @@
|
|||||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.ha.HAServiceProtocol;
|
|
||||||
import org.apache.hadoop.io.DataInputByteBuffer;
|
import org.apache.hadoop.io.DataInputByteBuffer;
|
||||||
import org.apache.hadoop.security.Credentials;
|
import org.apache.hadoop.security.Credentials;
|
||||||
import org.apache.hadoop.security.token.delegation.DelegationKey;
|
import org.apache.hadoop.security.token.delegation.DelegationKey;
|
||||||
@ -41,16 +40,18 @@
|
|||||||
import org.apache.hadoop.util.ZKUtil;
|
import org.apache.hadoop.util.ZKUtil;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.client.RMHAServiceTarget;
|
|
||||||
import org.apache.hadoop.yarn.conf.HAUtil;
|
import org.apache.hadoop.yarn.conf.HAUtil;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto;
|
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto;
|
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto;
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMStateVersionProto;
|
||||||
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl;
|
||||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||||
import org.apache.zookeeper.CreateMode;
|
import org.apache.zookeeper.CreateMode;
|
||||||
import org.apache.zookeeper.KeeperException;
|
import org.apache.zookeeper.KeeperException;
|
||||||
@ -64,9 +65,9 @@
|
|||||||
import org.apache.zookeeper.data.ACL;
|
import org.apache.zookeeper.data.ACL;
|
||||||
import org.apache.zookeeper.data.Id;
|
import org.apache.zookeeper.data.Id;
|
||||||
import org.apache.zookeeper.data.Stat;
|
import org.apache.zookeeper.data.Stat;
|
||||||
|
import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
|
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
@Unstable
|
@Unstable
|
||||||
@ -74,7 +75,9 @@ public class ZKRMStateStore extends RMStateStore {
|
|||||||
|
|
||||||
public static final Log LOG = LogFactory.getLog(ZKRMStateStore.class);
|
public static final Log LOG = LogFactory.getLog(ZKRMStateStore.class);
|
||||||
|
|
||||||
private static final String ROOT_ZNODE_NAME = "ZKRMStateRoot";
|
protected static final String ROOT_ZNODE_NAME = "ZKRMStateRoot";
|
||||||
|
protected static final RMStateVersion CURRENT_VERSION_INFO = RMStateVersion
|
||||||
|
.newInstance(1, 0);
|
||||||
private int numRetries;
|
private int numRetries;
|
||||||
|
|
||||||
private String zkHostPort = null;
|
private String zkHostPort = null;
|
||||||
@ -301,6 +304,36 @@ protected synchronized void closeInternal() throws Exception {
|
|||||||
closeZkClients();
|
closeZkClients();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected RMStateVersion getCurrentVersion() {
|
||||||
|
return CURRENT_VERSION_INFO;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected synchronized void storeVersion() throws Exception {
|
||||||
|
String versionNodePath = getNodePath(zkRootNodePath, VERSION_NODE);
|
||||||
|
byte[] data =
|
||||||
|
((RMStateVersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray();
|
||||||
|
if (zkClient.exists(versionNodePath, true) != null) {
|
||||||
|
setDataWithRetries(versionNodePath, data, -1);
|
||||||
|
} else {
|
||||||
|
createWithRetries(versionNodePath, data, zkAcl, CreateMode.PERSISTENT);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected synchronized RMStateVersion loadVersion() throws Exception {
|
||||||
|
String versionNodePath = getNodePath(zkRootNodePath, VERSION_NODE);
|
||||||
|
|
||||||
|
if (zkClient.exists(versionNodePath, true) != null) {
|
||||||
|
byte[] data = getDataWithRetries(versionNodePath, true);
|
||||||
|
RMStateVersion version =
|
||||||
|
new RMStateVersionPBImpl(RMStateVersionProto.parseFrom(data));
|
||||||
|
return version;
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized RMState loadState() throws Exception {
|
public synchronized RMState loadState() throws Exception {
|
||||||
RMState rmState = new RMState();
|
RMState rmState = new RMState();
|
||||||
|
@ -0,0 +1,80 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.recovery.records;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The version information of RM state.
|
||||||
|
*/
|
||||||
|
@Private
|
||||||
|
@Unstable
|
||||||
|
public abstract class RMStateVersion {
|
||||||
|
|
||||||
|
public static RMStateVersion newInstance(int majorVersion, int minorVersion) {
|
||||||
|
RMStateVersion version = Records.newRecord(RMStateVersion.class);
|
||||||
|
version.setMajorVersion(majorVersion);
|
||||||
|
version.setMinorVersion(minorVersion);
|
||||||
|
return version;
|
||||||
|
}
|
||||||
|
|
||||||
|
public abstract int getMajorVersion();
|
||||||
|
|
||||||
|
public abstract void setMajorVersion(int majorVersion);
|
||||||
|
|
||||||
|
public abstract int getMinorVersion();
|
||||||
|
|
||||||
|
public abstract void setMinorVersion(int minorVersion);
|
||||||
|
|
||||||
|
public String toString() {
|
||||||
|
return getMajorVersion() + "." + getMinorVersion();
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isCompatibleTo(RMStateVersion version) {
|
||||||
|
return getMajorVersion() == version.getMajorVersion();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
final int prime = 31;
|
||||||
|
int result = 1;
|
||||||
|
result = prime * result + getMajorVersion();
|
||||||
|
result = prime * result + getMinorVersion();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object obj) {
|
||||||
|
if (this == obj)
|
||||||
|
return true;
|
||||||
|
if (obj == null)
|
||||||
|
return false;
|
||||||
|
if (getClass() != obj.getClass())
|
||||||
|
return false;
|
||||||
|
RMStateVersion other = (RMStateVersion) obj;
|
||||||
|
if (this.getMajorVersion() == other.getMajorVersion()
|
||||||
|
&& this.getMinorVersion() == other.getMinorVersion()) {
|
||||||
|
return true;
|
||||||
|
} else {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,76 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMStateVersionProto;
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMStateVersionProtoOrBuilder;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
|
||||||
|
|
||||||
|
public class RMStateVersionPBImpl extends RMStateVersion {
|
||||||
|
|
||||||
|
RMStateVersionProto proto = RMStateVersionProto.getDefaultInstance();
|
||||||
|
RMStateVersionProto.Builder builder = null;
|
||||||
|
boolean viaProto = false;
|
||||||
|
|
||||||
|
public RMStateVersionPBImpl() {
|
||||||
|
builder = RMStateVersionProto.newBuilder();
|
||||||
|
}
|
||||||
|
|
||||||
|
public RMStateVersionPBImpl(RMStateVersionProto proto) {
|
||||||
|
this.proto = proto;
|
||||||
|
viaProto = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
public RMStateVersionProto getProto() {
|
||||||
|
proto = viaProto ? proto : builder.build();
|
||||||
|
viaProto = true;
|
||||||
|
return proto;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void maybeInitBuilder() {
|
||||||
|
if (viaProto || builder == null) {
|
||||||
|
builder = RMStateVersionProto.newBuilder(proto);
|
||||||
|
}
|
||||||
|
viaProto = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getMajorVersion() {
|
||||||
|
RMStateVersionProtoOrBuilder p = viaProto ? proto : builder;
|
||||||
|
return p.getMajorVersion();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setMajorVersion(int major) {
|
||||||
|
maybeInitBuilder();
|
||||||
|
builder.setMajorVersion(major);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getMinorVersion() {
|
||||||
|
RMStateVersionProtoOrBuilder p = viaProto ? proto : builder;
|
||||||
|
return p.getMinorVersion();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setMinorVersion(int minor) {
|
||||||
|
maybeInitBuilder();
|
||||||
|
builder.setMinorVersion(minor);
|
||||||
|
}
|
||||||
|
}
|
@ -58,6 +58,7 @@
|
|||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMDTSecretManagerState;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMDTSecretManagerState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
@ -106,6 +107,8 @@ public EventHandler getEventHandler() {
|
|||||||
interface RMStateStoreHelper {
|
interface RMStateStoreHelper {
|
||||||
RMStateStore getRMStateStore() throws Exception;
|
RMStateStore getRMStateStore() throws Exception;
|
||||||
boolean isFinalStateValid() throws Exception;
|
boolean isFinalStateValid() throws Exception;
|
||||||
|
void writeVersion(RMStateVersion version) throws Exception;
|
||||||
|
RMStateVersion getCurrentVersion() throws Exception;
|
||||||
}
|
}
|
||||||
|
|
||||||
void waitNotify(TestDispatcher dispatcher) {
|
void waitNotify(TestDispatcher dispatcher) {
|
||||||
@ -379,4 +382,37 @@ private Token<AMRMTokenIdentifier> generateAMRMToken(
|
|||||||
appToken.setService(new Text("appToken service"));
|
appToken.setService(new Text("appToken service"));
|
||||||
return appToken;
|
return appToken;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testCheckVersion(RMStateStoreHelper stateStoreHelper)
|
||||||
|
throws Exception {
|
||||||
|
RMStateStore store = stateStoreHelper.getRMStateStore();
|
||||||
|
store.setRMDispatcher(new TestDispatcher());
|
||||||
|
|
||||||
|
// default version
|
||||||
|
RMStateVersion defaultVersion = stateStoreHelper.getCurrentVersion();
|
||||||
|
store.checkVersion();
|
||||||
|
Assert.assertEquals(defaultVersion, store.loadVersion());
|
||||||
|
|
||||||
|
// compatible version
|
||||||
|
RMStateVersion compatibleVersion =
|
||||||
|
RMStateVersion.newInstance(defaultVersion.getMajorVersion(),
|
||||||
|
defaultVersion.getMinorVersion() + 2);
|
||||||
|
stateStoreHelper.writeVersion(compatibleVersion);
|
||||||
|
Assert.assertEquals(compatibleVersion, store.loadVersion());
|
||||||
|
store.checkVersion();
|
||||||
|
// overwrite the compatible version
|
||||||
|
Assert.assertEquals(defaultVersion, store.loadVersion());
|
||||||
|
|
||||||
|
// incompatible version
|
||||||
|
RMStateVersion incompatibleVersion =
|
||||||
|
RMStateVersion.newInstance(defaultVersion.getMajorVersion() + 2,
|
||||||
|
defaultVersion.getMinorVersion());
|
||||||
|
stateStoreHelper.writeVersion(incompatibleVersion);
|
||||||
|
try {
|
||||||
|
store.checkVersion();
|
||||||
|
Assert.fail("Invalid version, should fail.");
|
||||||
|
} catch (Throwable t) {
|
||||||
|
Assert.assertTrue(t instanceof RMStateVersionIncompatibleException);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -32,6 +32,8 @@
|
|||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl;
|
||||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
@ -42,7 +44,7 @@ public class TestFSRMStateStore extends RMStateStoreTestBase {
|
|||||||
class TestFSRMStateStoreTester implements RMStateStoreHelper {
|
class TestFSRMStateStoreTester implements RMStateStoreHelper {
|
||||||
|
|
||||||
Path workingDirPathURI;
|
Path workingDirPathURI;
|
||||||
FileSystemRMStateStore store;
|
TestFileSystemRMStore store;
|
||||||
MiniDFSCluster cluster;
|
MiniDFSCluster cluster;
|
||||||
|
|
||||||
class TestFileSystemRMStore extends FileSystemRMStateStore {
|
class TestFileSystemRMStore extends FileSystemRMStateStore {
|
||||||
@ -54,6 +56,14 @@ class TestFileSystemRMStore extends FileSystemRMStateStore {
|
|||||||
start();
|
start();
|
||||||
Assert.assertNotNull(fs);
|
Assert.assertNotNull(fs);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Path getVersionNode() {
|
||||||
|
return new Path(new Path(workingDirPathURI, ROOT_DIR_NAME), VERSION_NODE);
|
||||||
|
}
|
||||||
|
|
||||||
|
public RMStateVersion getCurrentVersion() {
|
||||||
|
return CURRENT_VERSION_INFO;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public TestFSRMStateStoreTester(MiniDFSCluster cluster) throws Exception {
|
public TestFSRMStateStoreTester(MiniDFSCluster cluster) throws Exception {
|
||||||
@ -81,6 +91,17 @@ public boolean isFinalStateValid() throws Exception {
|
|||||||
FileStatus[] files = fs.listStatus(workingDirPathURI);
|
FileStatus[] files = fs.listStatus(workingDirPathURI);
|
||||||
return files.length == 1;
|
return files.length == 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void writeVersion(RMStateVersion version) throws Exception {
|
||||||
|
store.updateFile(store.getVersionNode(), ((RMStateVersionPBImpl) version)
|
||||||
|
.getProto().toByteArray());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RMStateVersion getCurrentVersion() throws Exception {
|
||||||
|
return store.getCurrentVersion();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -113,6 +134,7 @@ public void testFSRMStateStore() throws Exception {
|
|||||||
Assert.assertFalse(fileSystemRMStateStore.fsWorkingPath
|
Assert.assertFalse(fileSystemRMStateStore.fsWorkingPath
|
||||||
.getFileSystem(conf).exists(tempAppAttemptFile));
|
.getFileSystem(conf).exists(tempAppAttemptFile));
|
||||||
testRMDTSecretManagerStateStore(fsTester);
|
testRMDTSecretManagerStateStore(fsTester);
|
||||||
|
testCheckVersion(fsTester);
|
||||||
} finally {
|
} finally {
|
||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
}
|
}
|
||||||
|
@ -44,6 +44,8 @@
|
|||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService;
|
import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl;
|
||||||
import org.apache.zookeeper.ZooKeeper;
|
import org.apache.zookeeper.ZooKeeper;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
@ -54,7 +56,7 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
|
|||||||
class TestZKRMStateStoreTester implements RMStateStoreHelper {
|
class TestZKRMStateStoreTester implements RMStateStoreHelper {
|
||||||
|
|
||||||
ZooKeeper client;
|
ZooKeeper client;
|
||||||
ZKRMStateStore store;
|
TestZKRMStateStoreInternal store;
|
||||||
|
|
||||||
class TestZKRMStateStoreInternal extends ZKRMStateStore {
|
class TestZKRMStateStoreInternal extends ZKRMStateStore {
|
||||||
|
|
||||||
@ -69,6 +71,14 @@ public TestZKRMStateStoreInternal(Configuration conf, String workingZnode)
|
|||||||
public ZooKeeper getNewZooKeeper() throws IOException {
|
public ZooKeeper getNewZooKeeper() throws IOException {
|
||||||
return client;
|
return client;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public String getVersionNode() {
|
||||||
|
return znodeWorkingPath + "/" + ROOT_ZNODE_NAME + "/" + VERSION_NODE;
|
||||||
|
}
|
||||||
|
|
||||||
|
public RMStateVersion getCurrentVersion() {
|
||||||
|
return CURRENT_VERSION_INFO;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public RMStateStore getRMStateStore() throws Exception {
|
public RMStateStore getRMStateStore() throws Exception {
|
||||||
@ -86,6 +96,17 @@ public boolean isFinalStateValid() throws Exception {
|
|||||||
List<String> nodes = client.getChildren(store.znodeWorkingPath, false);
|
List<String> nodes = client.getChildren(store.znodeWorkingPath, false);
|
||||||
return nodes.size() == 1;
|
return nodes.size() == 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void writeVersion(RMStateVersion version) throws Exception {
|
||||||
|
client.setData(store.getVersionNode(), ((RMStateVersionPBImpl) version)
|
||||||
|
.getProto().toByteArray(), -1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RMStateVersion getCurrentVersion() throws Exception {
|
||||||
|
return store.getCurrentVersion();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -93,6 +114,7 @@ public void testZKRMStateStoreRealZK() throws Exception {
|
|||||||
TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester();
|
TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester();
|
||||||
testRMAppStateStore(zkTester);
|
testRMAppStateStore(zkTester);
|
||||||
testRMDTSecretManagerStateStore(zkTester);
|
testRMDTSecretManagerStateStore(zkTester);
|
||||||
|
testCheckVersion(zkTester);
|
||||||
}
|
}
|
||||||
|
|
||||||
private Configuration createHARMConf(
|
private Configuration createHARMConf(
|
||||||
|
Loading…
Reference in New Issue
Block a user