YARN-9048. Add znode hierarchy in Federation ZK State Store. (#6016)

This commit is contained in:
slfan1989 2023-10-10 05:06:41 +08:00 committed by GitHub
parent 594e9f29f5
commit b00d605832
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 395 additions and 39 deletions

View File

@ -26,6 +26,8 @@
import java.util.ArrayList;
import java.util.Calendar;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import java.util.TimeZone;
import java.util.Comparator;
import java.util.stream.Collectors;
@ -111,6 +113,7 @@
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.ACL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -131,8 +134,16 @@
* | |----- SC1
* | |----- SC2
* |--- APPLICATION
* | |----- APP1
* | |----- APP2
* | |----- HIERARCHIES
* | | |----- 1
* | | | |----- (#ApplicationId barring last character)
* | | | | | |----- APP Data
* | | | ....
* | | |
* | | |----- 2
* | | | |----- (#ApplicationId barring last 2 characters)
* | | | | |----- (#Last 2 characters of ApplicationId)
* | | | | | |----- APP Data
* |--- POLICY
* | |----- QUEUE1
* | |----- QUEUE1
@ -194,12 +205,19 @@ public class ZookeeperFederationStateStore implements FederationStateStore {
private int maxAppsInStateStore;
/** Directory to store the delegation token data. **/
private Map<Integer, String> routerAppRootHierarchies;
private String routerRMDTSecretManagerRoot;
private String routerRMDTMasterKeysRootPath;
private String routerRMDelegationTokensRootPath;
private String routerRMSequenceNumberPath;
private String routerRMMasterKeyIdPath;
private int appIdNodeSplitIndex = 0;
private final static int HIERARCHIES_LEVEL = 4;
@VisibleForTesting
public static final String ROUTER_APP_ROOT_HIERARCHIES = "HIERARCHIES";
private volatile Clock clock = SystemClock.getInstance();
protected static final Version CURRENT_VERSION_INFO = Version.newInstance(1, 1);
@ -208,6 +226,27 @@ public class ZookeeperFederationStateStore implements FederationStateStore {
private ZKFederationStateStoreOpDurations opDurations =
ZKFederationStateStoreOpDurations.getInstance();
/*
* Indicates different app attempt state store operations.
*/
private enum AppAttemptOp {
STORE,
UPDATE,
REMOVE
};
/**
* Encapsulates full app node path and corresponding split index.
*/
private final static class AppNodeSplitInfo {
private final String path;
private final int splitIndex;
AppNodeSplitInfo(String path, int splitIndex) {
this.path = path;
this.splitIndex = splitIndex;
}
}
@Override
public void init(Configuration conf) throws YarnException {
@ -234,6 +273,23 @@ public void init(Configuration conf) throws YarnException {
reservationsZNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_RESERVATION);
versionNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_VERSION);
String hierarchiesPath = getNodePath(appsZNode, ROUTER_APP_ROOT_HIERARCHIES);
routerAppRootHierarchies = new HashMap<>();
routerAppRootHierarchies.put(0, appsZNode);
for (int splitIndex = 1; splitIndex <= HIERARCHIES_LEVEL; splitIndex++) {
routerAppRootHierarchies.put(splitIndex,
getNodePath(hierarchiesPath, Integer.toString(splitIndex)));
}
appIdNodeSplitIndex = conf.getInt(YarnConfiguration.ZK_APPID_NODE_SPLIT_INDEX,
YarnConfiguration.DEFAULT_ZK_APPID_NODE_SPLIT_INDEX);
if (appIdNodeSplitIndex < 1 || appIdNodeSplitIndex > HIERARCHIES_LEVEL) {
LOG.info("Invalid value {} for config {} specified. Resetting it to {}",
appIdNodeSplitIndex, YarnConfiguration.ZK_APPID_NODE_SPLIT_INDEX,
YarnConfiguration.DEFAULT_ZK_APPID_NODE_SPLIT_INDEX);
appIdNodeSplitIndex = YarnConfiguration.DEFAULT_ZK_APPID_NODE_SPLIT_INDEX;
}
// delegation token znodes
routerRMDTSecretManagerRoot = getNodePath(baseZNode, ROUTER_RM_DT_SECRET_MANAGER_ROOT);
routerRMDTMasterKeysRootPath = getNodePath(routerRMDTSecretManagerRoot,
@ -250,6 +306,12 @@ public void init(Configuration conf) throws YarnException {
List<ACL> zkAcl = ZKCuratorManager.getZKAcls(conf);
zkManager.createRootDirRecursively(membershipZNode, zkAcl);
zkManager.createRootDirRecursively(appsZNode, zkAcl);
zkManager.createRootDirRecursively(
getNodePath(appsZNode, ROUTER_APP_ROOT_HIERARCHIES));
for (int splitIndex = 1; splitIndex <= HIERARCHIES_LEVEL; splitIndex++) {
zkManager.createRootDirRecursively(
routerAppRootHierarchies.get(splitIndex));
}
zkManager.createRootDirRecursively(policiesZNode, zkAcl);
zkManager.createRootDirRecursively(reservationsZNode, zkAcl);
zkManager.createRootDirRecursively(routerRMDTSecretManagerRoot, zkAcl);
@ -320,6 +382,21 @@ public void close() throws Exception {
}
}
/**
* Register the home {@code SubClusterId} of the newly submitted
* {@code ApplicationId}. Currently response is empty if the operation was
* successful, if not an exception reporting reason for a failure. If a
* mapping for the application already existed, the {@code SubClusterId} in
* this response will return the existing mapping which might be different
* from that in the {@code AddApplicationHomeSubClusterRequest}.
*
* @param request the request to register a new application with its home sub-cluster.
* @return upon successful registration of the application in the StateStore,
* {@code AddApplicationHomeSubClusterRequest} containing the home
* sub-cluster of the application. Otherwise, an exception reporting
* reason for a failure.
* @throws YarnException indicates exceptions from yarn servers.
*/
@Override
public AddApplicationHomeSubClusterResponse addApplicationHomeSubCluster(
AddApplicationHomeSubClusterRequest request) throws YarnException {
@ -367,6 +444,17 @@ public AddApplicationHomeSubClusterResponse addApplicationHomeSubCluster(
throw new YarnException("Cannot addApplicationHomeSubCluster by request");
}
/**
* Update the home {@code SubClusterId} of a previously submitted
* {@code ApplicationId}. Currently response is empty if the operation was
* successful, if not an exception reporting reason for a failure.
*
* @param request the request to update the home sub-cluster of an
* application.
* @return empty on successful update of the application in the StateStore, if
* not an exception reporting reason for a failure
* @throws YarnException indicates exceptions from yarn servers.
*/
@Override
public UpdateApplicationHomeSubClusterResponse updateApplicationHomeSubCluster(
UpdateApplicationHomeSubClusterRequest request) throws YarnException {
@ -402,6 +490,15 @@ public UpdateApplicationHomeSubClusterResponse updateApplicationHomeSubCluster(
return UpdateApplicationHomeSubClusterResponse.newInstance();
}
/**
* Get information about the application identified by the input
* {@code ApplicationId}.
*
* @param request contains the application queried
* @return {@code ApplicationHomeSubCluster} containing the application's home
* subcluster
* @throws YarnException indicates exceptions from yarn servers.
*/
@Override
public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster(
GetApplicationHomeSubClusterRequest request) throws YarnException {
@ -437,6 +534,14 @@ public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster(
subClusterId, createTime);
}
/**
* Get the {@code ApplicationHomeSubCluster} list representing the mapping of
* all submitted applications to it's home sub-cluster.
*
* @param request empty representing all applications
* @return the mapping of all submitted application to it's home sub-cluster
* @throws YarnException indicates exceptions from yarn servers.
*/
@Override
public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster(
GetApplicationsHomeSubClusterRequest request) throws YarnException {
@ -448,9 +553,7 @@ public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster(
try {
long start = clock.getTime();
SubClusterId requestSC = request.getSubClusterId();
List<String> children = zkManager.getChildren(appsZNode);
List<ApplicationHomeSubCluster> result = children.stream()
.map(child -> generateAppHomeSC(child))
List<ApplicationHomeSubCluster> result = loadRouterApplications().stream()
.sorted(Comparator.comparing(ApplicationHomeSubCluster::getCreateTime).reversed())
.filter(appHomeSC -> filterHomeSubCluster(requestSC, appHomeSC.getHomeSubCluster()))
.limit(maxAppsInStateStore)
@ -467,48 +570,51 @@ public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster(
throw new YarnException("Cannot get app by request");
}
private ApplicationHomeSubCluster generateAppHomeSC(String appId) {
try {
// Parse ApplicationHomeSubCluster
ApplicationId applicationId = ApplicationId.fromString(appId);
ApplicationHomeSubCluster zkStoreApplicationHomeSubCluster =
getApplicationHomeSubCluster(applicationId);
// Prepare to return data
SubClusterId subClusterId = zkStoreApplicationHomeSubCluster.getHomeSubCluster();
ApplicationHomeSubCluster resultApplicationHomeSubCluster =
ApplicationHomeSubCluster.newInstance(applicationId, subClusterId);
return resultApplicationHomeSubCluster;
} catch (Exception ex) {
LOG.error("get homeSubCluster by appId = {}.", appId, ex);
}
return null;
}
/**
* Delete the mapping of home {@code SubClusterId} of a previously submitted
* {@code ApplicationId}. Currently response is empty if the operation was
* successful, if not an exception reporting reason for a failure.
*
* @param request the request to delete the home sub-cluster of an
* application.
* @return empty on successful update of the application in the StateStore, if
* not an exception reporting reason for a failure
* @throws YarnException if the request is invalid/fails
*/
@Override
public DeleteApplicationHomeSubClusterResponse
deleteApplicationHomeSubCluster(
DeleteApplicationHomeSubClusterRequest request)
throws YarnException {
public DeleteApplicationHomeSubClusterResponse deleteApplicationHomeSubCluster(
DeleteApplicationHomeSubClusterRequest request) throws YarnException {
long start = clock.getTime();
FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
ApplicationId appId = request.getApplicationId();
String appZNode = getNodePath(appsZNode, appId.toString());
String appIdRemovePath = getLeafAppIdNodePath(appId.toString(), false);
int splitIndex = appIdNodeSplitIndex;
boolean exists = false;
boolean exists = true;
try {
exists = zkManager.exists(appZNode);
if (!exists(appIdRemovePath)) {
AppNodeSplitInfo alternatePathInfo = getAlternatePath(appId.toString());
if (alternatePathInfo != null) {
appIdRemovePath = alternatePathInfo.path;
splitIndex = alternatePathInfo.splitIndex;
} else {
exists = false;
}
}
} catch (Exception e) {
String errMsg = "Cannot check app: " + e.getMessage();
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}
if (!exists) {
String errMsg = "Application " + appId + " does not exist";
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}
try {
zkManager.delete(appZNode);
zkManager.delete(appIdRemovePath);
// Check if we should remove the parent app node as well.
checkRemoveParentAppNode(appIdRemovePath, splitIndex);
} catch (Exception e) {
String errMsg = "Cannot delete app: " + e.getMessage();
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
@ -730,11 +836,12 @@ public void storeVersion() throws Exception {
*
* @param appId Application identifier.
* @return ApplicationHomeSubCluster identifier.
* @throws Exception If it cannot contact ZooKeeper.
* @throws YarnException If it cannot contact ZooKeeper.
*/
private ApplicationHomeSubCluster getApplicationHomeSubCluster(
final ApplicationId appId) throws YarnException {
String appZNode = getNodePath(appsZNode, appId.toString());
final ApplicationId appId) throws YarnException{
String appZNode = getLeafAppIdNodePath(appId.toString(), false);
ApplicationHomeSubCluster appHomeSubCluster = null;
byte[] data = get(appZNode);
@ -761,11 +868,44 @@ private ApplicationHomeSubCluster getApplicationHomeSubCluster(
private void storeOrUpdateApplicationHomeSubCluster(final ApplicationId applicationId,
final ApplicationHomeSubCluster applicationHomeSubCluster, boolean update)
throws YarnException {
String appZNode = getNodePath(appsZNode, applicationId.toString());
ApplicationHomeSubClusterProto proto =
((ApplicationHomeSubClusterPBImpl) applicationHomeSubCluster).getProto();
byte[] data = proto.toByteArray();
put(appZNode, data, update);
try {
ApplicationHomeSubClusterProto proto =
((ApplicationHomeSubClusterPBImpl) applicationHomeSubCluster).getProto();
byte[] data = proto.toByteArray();
if (update) {
updateApplicationStateInternal(applicationId, data);
} else {
storeApplicationStateInternal(applicationId, data);
}
} catch (Exception e) {
throw new YarnException(e);
}
}
protected void storeApplicationStateInternal(final ApplicationId applicationId, byte[] data)
throws Exception {
String nodeCreatePath = getLeafAppIdNodePath(applicationId.toString(), true);
LOG.debug("Storing info for app: {} at: {}.", applicationId, nodeCreatePath);
put(nodeCreatePath, data, false);
}
protected void updateApplicationStateInternal(final ApplicationId applicationId, byte[] data)
throws Exception {
String nodeUpdatePath = getLeafAppIdNodePath(applicationId.toString(), false);
if (!exists(nodeUpdatePath)) {
AppNodeSplitInfo alternatePathInfo = getAlternatePath(applicationId.toString());
if (alternatePathInfo != null) {
nodeUpdatePath = alternatePathInfo.path;
} else if (appIdNodeSplitIndex != 0) {
// No alternate path exists. Create path as per configured split index.
String rootNode = getSplitAppNodeParent(nodeUpdatePath, appIdNodeSplitIndex);
if (!exists(rootNode)) {
zkManager.create(rootNode);
}
}
}
LOG.debug("Storing final state info for app: {} at: {}.", applicationId, nodeUpdatePath);
put(nodeUpdatePath, data, true);
}
/**
@ -1674,4 +1814,198 @@ public int incrementCurrentKeyId() {
}
return keyIdSeqCounter.getCount();
}
/**
* Get parent app node path based on full path and split index supplied.
* @param appIdPath App id path for which parent needs to be returned.
* @param splitIndex split index.
* @return parent app node path.
*/
private String getSplitAppNodeParent(String appIdPath, int splitIndex) {
// Calculated as string upto index (appIdPath Length - split index - 1). We
// deduct 1 to exclude path separator.
return appIdPath.substring(0, appIdPath.length() - splitIndex - 1);
}
/**
* Checks if parent app node has no leaf nodes and if it does not have,
* removes it. Called while removing application.
*
* @param appIdPath path of app id to be removed.
* @param splitIndex split index.
* @throws Exception if any problem occurs while performing ZK operation.
*/
private void checkRemoveParentAppNode(String appIdPath, int splitIndex)
throws Exception {
if (splitIndex == 0) {
return;
}
String parentAppNode = getSplitAppNodeParent(appIdPath, splitIndex);
List<String> children;
try {
children = getChildren(parentAppNode);
} catch (KeeperException.NoNodeException ke) {
// It should be fine to swallow this exception as the parent app node we
// intend to delete is already deleted.
LOG.debug("Unable to remove app parent node {} as it does not exist.",
parentAppNode);
return;
}
// If children==null or children is not empty, we cannot delete the parent path.
if (children == null || !children.isEmpty()) {
return;
}
// No apps stored under parent path.
try {
zkManager.delete(parentAppNode);
LOG.debug("No leaf app node exists. Removing parent node {}.", parentAppNode);
} catch (KeeperException.NotEmptyException ke) {
// It should be fine to swallow this exception as the parent app node
// has to be deleted only if it has no children. And this node has.
LOG.debug("Unable to remove app parent node {} as it has children.",
parentAppNode);
}
}
List<String> getChildren(final String path) throws Exception {
return zkManager.getChildren(path);
}
/**
* Get alternate path for app id if path according to configured split index
* does not exist. We look for path based on all possible split indices.
* @param appId
* @return a {@link AppNodeSplitInfo} object containing the path and split
* index if it exists, null otherwise.
* @throws Exception if any problem occurs while performing ZK operation.
*/
private AppNodeSplitInfo getAlternatePath(String appId) throws Exception {
for (Map.Entry<Integer, String> entry : routerAppRootHierarchies.entrySet()) {
// Look for other paths
int splitIndex = entry.getKey();
if (splitIndex != appIdNodeSplitIndex) {
String alternatePath =
getLeafAppIdNodePath(appId, entry.getValue(), splitIndex, false);
if (exists(alternatePath)) {
return new AppNodeSplitInfo(alternatePath, splitIndex);
}
}
}
return null;
}
/**
* Returns leaf app node path based on app id and passed split index. If the
* passed flag createParentIfNotExists is true, also creates the parent app
* node if it does not exist.
* @param appId application id.
* @param rootNode app root node based on split index.
* @param appIdNodeSplitIdx split index.
* @param createParentIfNotExists flag which determines if parent app node
* needs to be created(as per split) if it does not exist.
* @return leaf app node path.
* @throws Exception if any problem occurs while performing ZK operation.
*/
private String getLeafAppIdNodePath(String appId, String rootNode,
int appIdNodeSplitIdx, boolean createParentIfNotExists) throws Exception {
if (appIdNodeSplitIdx == 0) {
return getNodePath(rootNode, appId);
}
String nodeName = appId;
int splitIdx = nodeName.length() - appIdNodeSplitIdx;
String rootNodePath = getNodePath(rootNode, nodeName.substring(0, splitIdx));
if (createParentIfNotExists && !exists(rootNodePath)) {
try {
zkManager.create(rootNodePath);
} catch (KeeperException.NodeExistsException e) {
LOG.debug("Unable to create app parent node {} as it already exists.", rootNodePath);
}
}
return getNodePath(rootNodePath, nodeName.substring(splitIdx));
}
/**
* Returns leaf app node path based on app id and configured split index. If
* the passed flag createParentIfNotExists is true, also creates the parent
* app node if it does not exist.
* @param appId application id.
* @param createParentIfNotExists flag which determines if parent app node
* needs to be created(as per split) if it does not exist.
* @return leaf app node path.
* @throws YarnException if any problem occurs while performing ZK operation.
*/
private String getLeafAppIdNodePath(String appId,
boolean createParentIfNotExists) throws YarnException {
try {
String rootNode = routerAppRootHierarchies.get(appIdNodeSplitIndex);
return getLeafAppIdNodePath(appId, rootNode, appIdNodeSplitIndex, createParentIfNotExists);
} catch (Exception e) {
throw new YarnException(e);
}
}
private ApplicationHomeSubCluster loadRouterAppStateFromAppNode(String appNodePath)
throws Exception {
byte[] data = get(appNodePath);
LOG.debug("Loading application from znode: {}", appNodePath);
ApplicationHomeSubCluster appHomeSubCluster = null;
if (data == null) {
return appHomeSubCluster;
}
try {
appHomeSubCluster = new ApplicationHomeSubClusterPBImpl(
ApplicationHomeSubClusterProto.parseFrom(data));
} catch (InvalidProtocolBufferException e) {
String errMsg = "Cannot parse application at " + appNodePath;
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}
return appHomeSubCluster;
}
private List<ApplicationHomeSubCluster> loadRouterApplications() throws Exception {
List<ApplicationHomeSubCluster> applicationHomeSubClusters = new ArrayList<>();
for (int splitIndex = 0; splitIndex <= 4; splitIndex++) {
String appRoot = routerAppRootHierarchies.get(splitIndex);
if (appRoot == null) {
continue;
}
List<String> childNodes = getChildren(appRoot);
boolean appNodeFound = false;
for (String childNodeName : childNodes) {
if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) {
appNodeFound = true;
if (splitIndex == 0) {
ApplicationHomeSubCluster applicationHomeSubCluster =
loadRouterAppStateFromAppNode(getNodePath(appRoot, childNodeName));
applicationHomeSubClusters.add(applicationHomeSubCluster);
} else {
// If AppId Node is partitioned.
String parentNodePath = getNodePath(appRoot, childNodeName);
List<String> leafNodes = getChildren(parentNodePath);
for (String leafNodeName : leafNodes) {
ApplicationHomeSubCluster applicationHomeSubCluster =
loadRouterAppStateFromAppNode(getNodePath(parentNodePath, leafNodeName));
applicationHomeSubClusters.add(applicationHomeSubCluster);
}
}
} else if (!childNodeName.equals(ROUTER_APP_ROOT_HIERARCHIES)){
LOG.debug("Unknown child node with name {} under {}.", childNodeName, appRoot);
}
}
if (splitIndex != appIdNodeSplitIndex && !appNodeFound) {
// If no loaded app exists for a particular split index and the split
// index for which apps are being loaded is not the one configured, then
// we do not need to keep track of this hierarchy for storing/updating/
// removing app/app attempt znodes.
routerAppRootHierarchies.remove(splitIndex);
}
}
return applicationHomeSubClusters;
}
}

View File

@ -0,0 +1,22 @@
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
<configuration>
<property>
<name>yarn.resourcemanager.zk-appid-node.split-index</name>
<value>1</value>
</property>
</configuration>