From 0a867b9c39012bde4461ff45306c99bada74e5e8 Mon Sep 17 00:00:00 2001 From: slfan1989 <55643692+slfan1989@users.noreply.github.com> Date: Sat, 4 Nov 2023 19:09:48 +0800 Subject: [PATCH] YARN-11594. [Federation] Improve Yarn Federation documentation. (#6209) Contributed by Shilun Fan. Reviewed-by: Inigo Goiri Signed-off-by: Shilun Fan --- .../src/site/markdown/Federation.md | 422 ++++++++++++------ 1 file changed, 296 insertions(+), 126 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/Federation.md b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/Federation.md index 5d5dc786e1..631a62896a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/Federation.md +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/Federation.md @@ -24,7 +24,7 @@ This document described a federation-based approach to scale a single YARN clust The applications running in this federated environment will see a single massive YARN cluster and will be able to schedule tasks on any node of the federated cluster. Under the hood, the federation system will negotiate with sub-clusters resource managers and provide resources to the application. The goal is to allow an individual job to “span” sub-clusters seamlessly. This design is structurally scalable, as we bound the number of nodes each RM is responsible for, and appropriate policies, will try to ensure that the majority of applications will reside within a single sub-cluster, thus the number of applications each RM will see is also bounded. This means we could almost linearly scale, by simply adding sub-clusters (as very little coordination is needed across them). -This architecture can provide very tight enforcement of scheduling invariants within each sub-cluster (simply inherits from YARN), while continuous rebalancing across subcluster will enforce (less strictly) that these properties are also respected at a global level (e.g., if a sub-cluster loses a large number of nodes, we could re-map queues to other sub-clusters to ensure users running on the impaired sub-cluster are not unfairly affected). +This architecture can provide very tight enforcement of scheduling invariants within each sub-cluster (simply inherits from YARN), while continuous re-balancing across sub-cluster will enforce (less strictly) that these properties are also respected at a global level (e.g., if a sub-cluster loses a large number of nodes, we could re-map queues to other sub-clusters to ensure users running on the impaired sub-cluster are not unfairly affected). Federation is designed as a “layer” atop of existing YARN codebase, with limited changes in the core YARN mechanisms. @@ -78,7 +78,7 @@ to minimize overhead on the scheduling infrastructure (more in section on scalab 3. Intercepts all the requests, thus it can enforce application quotas, which would not be enforceable by sub-cluster RM (as each only see a fraction of the AM requests). 4. The AMRMProxy can enforce load-balancing / overflow policies. -###Global Policy Generator +### Global Policy Generator Global Policy Generator overlooks the entire federation and ensures that the system is configured and tuned properly all the time. A key design point is that the cluster availability does not depend on an always-on GPG. The GPG operates continuously but out-of-band from all cluster operations, and provide us with a unique vantage point, that allows to enforce global invariants, affect load balancing, trigger draining of sub-clusters that will undergo maintenance, etc. @@ -94,19 +94,19 @@ This part of the federation system is part of future work in [YARN-5597](https:/ ### Federation State-Store The Federation State defines the additional state that needs to be maintained to loosely couple multiple individual sub-clusters into a single large federated cluster. This includes the following information: -####Sub-cluster Membership +#### Sub-cluster Membership The member YARN RMs continuously heartbeat to the state store to keep alive and publish their current capability/load information. This information is used by the -Global Policy Generator (GPG) to make proper policy decisions. Also this information can be used by routers to select the best home sub-cluster. This mechanism allows +Global Policy Generator (GPG) to make proper policy decisions. Also, this information can be used by routers to select the best home sub-cluster. This mechanism allows us to dynamically grow/shrink the “cluster fleet” by adding or removing sub-clusters. This also allows for easy maintenance of each sub-cluster. This is new functionality that needs to be added to the YARN RM but the mechanisms are well understood as it’s similar to individual YARN RM HA. -####Application’s Home Sub-cluster +#### Application’s Home Sub-cluster The sub-cluster on which the Application Master (AM) runs is called the Application’s “home sub-cluster”. The AM is not limited to resources from the home sub-cluster but can also request resources from other sub-clusters, referred to as secondary sub-clusters. The federated environment will be configured and tuned periodically such that when an AM is placed on a sub-cluster, it should be able to find most of the resources on the home sub-cluster. Only in certain cases it should need to ask for resources from other sub-clusters. -###Federation Policy Store +### Federation Policy Store The federation Policy Store is a logically separate store (while it might be backed by the same physical component), which contains information about how applications and resource requests are routed to different sub-clusters. The current implementation provides @@ -161,13 +161,12 @@ Configuration These are common configurations that should appear in the **conf/yarn-site.xml** at each machine in the federation. +| Property | Example | Description | +|:----------------------------------|:-------------------------|:----------------------------------------------------------------------------| +| `yarn.federation.enabled` | `true` | Whether federation is enabled or not | +| `yarn.resourcemanager.cluster-id` | `` | The unique subcluster identifier for this RM (same as the one used for HA). | -| Property | Example | Description | -|:---- |:---- |:---- | -|`yarn.federation.enabled` | `true` | Whether federation is enabled or not | -|`yarn.resourcemanager.cluster-id` | `` | The unique subcluster identifier for this RM (same as the one used for HA). | - -#### State-Store: +#### How to configure State-Store Currently, we support ZooKeeper and SQL based implementations of the state-store. @@ -175,20 +174,26 @@ Currently, we support ZooKeeper and SQL based implementations of the state-store ZooKeeper: one must set the ZooKeeper settings for Hadoop: -| Property | Example | Description | -|:---- |:---- |:---- | -|`yarn.federation.state-store.class` | `org.apache.hadoop.yarn.server.federation.store.impl.ZookeeperFederationStateStore` | The type of state-store to use. | -|`hadoop.zk.address` | `host:port` | The address for the ZooKeeper ensemble. | +| Property | Example | Description | +|:------------------------------------|:------------------------------------------------------------------------------------|:----------------------------------------| +| `yarn.federation.state-store.class` | `org.apache.hadoop.yarn.server.federation.store.impl.ZookeeperFederationStateStore` | The type of state-store to use. | +| `hadoop.zk.address` | `host:port` | The address for the ZooKeeper ensemble. | SQL: one must setup the following parameters: -| Property | Example | Description | -|:---- |:---- |:---- | -|`yarn.federation.state-store.class` | `org.apache.hadoop.yarn.server.federation.store.impl.SQLFederationStateStore` | The type of state-store to use. | -|`yarn.federation.state-store.sql.url` | `jdbc:mysql://:/FederationStateStore` | For SQLFederationStateStore the name of the DB where the state is stored. | -|`yarn.federation.state-store.sql.jdbc-class` | `com.mysql.jdbc.jdbc2.optional.MysqlDataSource` | For SQLFederationStateStore the jdbc class to use. | -|`yarn.federation.state-store.sql.username` | `` | For SQLFederationStateStore the username for the DB connection. | -|`yarn.federation.state-store.sql.password` | `` | For SQLFederationStateStore the password for the DB connection. | +| Property | Example | Description | +|:--------------------------------------------------|:------------------------------------------------------------------------------|:---------------------------------------------------------------------------------------------------------| +| `yarn.federation.state-store.class` | `org.apache.hadoop.yarn.server.federation.store.impl.SQLFederationStateStore` | The type of state-store to use. | +| `yarn.federation.state-store.sql.url` | `jdbc:mysql://:/FederationStateStore` | For SQLFederationStateStore the name of the DB where the state is stored. | +| `yarn.federation.state-store.sql.jdbc-class` | `com.mysql.jdbc.jdbc2.optional.MysqlDataSource` | For SQLFederationStateStore the jdbc class to use. | +| `yarn.federation.state-store.sql.username` | `` | For SQLFederationStateStore the username for the DB connection. | +| `yarn.federation.state-store.sql.password` | `` | For SQLFederationStateStore the password for the DB connection. | +| `yarn.federation.state-store.sql.max-connections` | `1` | This is the maximum number of parallel connections each Router makes to the state-store. | +| `yarn.federation.state-store.sql.minimum-idle` | `1` | The property controls the minimum number of idle connections that HikariCP trie to maintain in the pool. | +| `yarn.federation.state-store.sql.pool-name` | `YARN-Federation-DataBasePool` | Specifies the name of the connection pool used by the FederationSQLStateStore. | +| `yarn.federation.state-store.sql.max-life-time` | `30m` | This property controls the maximum lifetime of a connection in the pool. | +| `yarn.federation.state-store.sql.idle-time-out` | `10m` | This property controls the maximum amount of time that a connection is allowed to sit idle in the pool. | +| `yarn.federation.state-store.sql.conn-time-out` | `10s` | Set the maximum amount of time that a client will wait for a connection from the pool. | We provide scripts for **MySQL** and **Microsoft SQL Server**. @@ -224,30 +229,32 @@ SQL-Server scripts are located in **sbin/FederationStateStore/SQLServer/**. 4. SQL Server 2017 Enterprise 5. SQL Server 2019 Enterprise -#### Optional: +#### How to configure Optional -| Property | Example | Description | -|:---- |:---- |:---- | -|`yarn.federation.failover.enabled` | `true` | Whether should retry considering RM failover within each subcluster. | -|`yarn.federation.blacklist-subclusters` | `` | A list of black-listed sub-clusters, useful to disable a sub-cluster | -|`yarn.federation.policy-manager` | `org.apache.hadoop.yarn.server.federation.policies.manager.WeightedLocalityPolicyManager` | The choice of policy manager determines how Applications and ResourceRequests are routed through the system. | -|`yarn.federation.policy-manager-params` | `` | The payload that configures the policy. In our example a set of weights for router and amrmproxy policies. This is typically generated by serializing a policymanager that has been configured programmatically, or by populating the state-store with the .json serialized form of it. | -|`yarn.federation.subcluster-resolver.class` | `org.apache.hadoop.yarn.server.federation.resolver.DefaultSubClusterResolverImpl` | The class used to resolve which subcluster a node belongs to, and which subcluster(s) a rack belongs to. | -|`yarn.federation.machine-list` | `` | Path of machine-list file used by `SubClusterResolver`. Each line of the file is a node with sub-cluster and rack information. Below is the example:

node1, subcluster1, rack1
node2, subcluster2, rack1
node3, subcluster3, rack2
node4, subcluster3, rack2 | +| Property | Example | Description | +|:--------------------------------------------|:------------------------------------------------------------------------------------------|:-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `yarn.federation.failover.enabled` | `true` | Whether should retry considering RM failover within each sub-cluster. | +| `yarn.federation.non-ha.enabled` | `false` | If our subCluster's ResourceManager (RM) does not have High Availability (HA) enabled, we can configure this parameter as true. However, it is recommended to use RM HA in a production environment. | +| `yarn.client.failover-proxy-provider` | `org.apache.hadoop.yarn.server.federation.failover.FederationRMFailoverProxyProvider` | A FailoverProxyProvider implementation that uses the FederationStateStore to determine the ResourceManager to connect to. This supports both HA and regular mode which is controlled by configuration. | +| `yarn.federation.blacklist-subclusters` | `` | A list of black-listed sub-clusters, useful to disable a sub-cluster | +| `yarn.federation.policy-manager` | `org.apache.hadoop.yarn.server.federation.policies.manager.WeightedLocalityPolicyManager` | The choice of policy manager determines how Applications and ResourceRequests are routed through the system. | +| `yarn.federation.policy-manager-params` | `` | The payload that configures the policy. In our example a set of weights for router and amrmproxy policies. This is typically generated by serializing a policymanager that has been configured programmatically, or by populating the state-store with the .json serialized form of it. | +| `yarn.federation.subcluster-resolver.class` | `org.apache.hadoop.yarn.server.federation.resolver.DefaultSubClusterResolverImpl` | The class used to resolve which sub-cluster a node belongs to, and which subcluster(s) a rack belongs to. | +| `yarn.federation.machine-list` | `` | Path of machine-list file used by `SubClusterResolver`. Each line of the file is a node with sub-cluster and rack information. Below is the example:

node1, subcluster1, rack1
node2, subcluster2, rack1
node3, subcluster3, rack2
node4, subcluster3, rack2 | -- yarn.federation.policy-manager-params +##### How to configure yarn.federation.policy-manager-params To configure the `yarn.federation.policy-manager-params` parameter, which represents the weight policy for the default queue, and where the relevant information will be parsed as `WeightedPolicyInfo`. We can use the following JSON format for configuration: - ```xml - - yarn.federation.policy-manager-params - {"routerPolicyWeights":{"entry":[{"key":{"id":"SC-2"},"value":"0.3"},{"key":{"id":"SC-1"},"value":"0.7"}]},"amrmPolicyWeights":{"entry":[{"key":{"id":"SC-2"},"value":"0.4"},{"key":{"id":"SC-1"},"value":"0.6"}]},"headroomAlpha":"1.0"} - - ``` +```xml + + yarn.federation.policy-manager-params + {"routerPolicyWeights":{"entry":[{"key":{"id":"SC-2"},"value":"0.3"},{"key":{"id":"SC-1"},"value":"0.7"}]},"amrmPolicyWeights":{"entry":[{"key":{"id":"SC-2"},"value":"0.4"},{"key":{"id":"SC-1"},"value":"0.6"}]},"headroomAlpha":"1.0"} + +``` This JSON configuration allows you to define the weight policy for default queue, where: @@ -255,10 +262,9 @@ SQL-Server scripts are located in **sbin/FederationStateStore/SQLServer/**. - The `amrmPolicyWeights` represents the allocation ratios for Application Master when request containers from different subclusters' RM. For instance, when an AM requests containers, it will request `40%` of the containers from `SC-2` and `60%` of the containers from `SC-1`. - The `headroomAlpha` used by policies that balance weight-based and load-based considerations in their decisions. For policies that use this parameter, values close to 1 indicate that most of the decision should be based on currently observed headroom from various sub-clusters, values close to zero, indicate that the decision should be mostly based on weights and practically ignore current load. -How to configure the policy-manager --------------------- +##### How to configure the policy-manager -Router Policy +**Router Policy** Router Policy defines the logic for determining the routing of an application submission and determines the HomeSubCluster for the application. @@ -284,7 +290,7 @@ Router Policy - WeightedRandomRouterPolicy - This policy implements a weighted random sample among currently active sub-clusters. -AMRM Policy +**AMRM Policy** AMRM Proxy defines the logic to split the resource request list received by AM among RMs. @@ -303,26 +309,26 @@ AMRM Policy - RejectAMRMProxyPolicy - This policy simply rejects all requests. Useful to prevent apps from accessing any sub-cluster. -Policy Manager +**Policy Manager** The PolicyManager is providing a combination of RouterPolicy and AMRMPolicy. We can set policy-manager like this: - ```xml - - - yarn.federation.policy-manager - org.apache.hadoop.yarn.server.federation.policies.manager.HashBroadcastPolicyManager - - ``` +```xml + + + yarn.federation.policy-manager + org.apache.hadoop.yarn.server.federation.policies.manager.HashBroadcastPolicyManager + +``` - HashBroadcastPolicyManager - Policy that routes applications via hashing of their queuename, and broadcast resource requests. This picks a HashBasedRouterPolicy for the router and a BroadcastAMRMProxyPolicy for the amrmproxy as they are designed to work together. @@ -337,8 +343,7 @@ Policy Manager - WeightedLocalityPolicyManager - Policy that allows operator to configure "weights" for routing. This picks a LocalityRouterPolicy for the router and a LocalityMulticastAMRMProxyPolicy for the amrmproxy as they are designed to work together. -How to configure the queue policy --------------------- +##### How to configure the queue policy We will provide a set of commands to view and save queue policies. @@ -369,38 +374,120 @@ WeightedPolicyInfo include the following: used by policies that balance weight-based and load-based considerations in their decisions. For policies that use this parameter, values close to 1 indicate that most of the decision should be based on currently observed headroom from various sub-clusters, values close to zero, indicate that the decision should be mostly based on weights and practically ignore current load. +##### How to Config ZookeeperFederationStateStore Hierarchies + +Similar to YARN-2962, We have implemented hierarchical storage for applications in the ZooKeeper federation store to manage the number of nodes under a specific Znode. + +We can configure `yarn.resourcemanager.zk-appid-node.split-index`, default is 0, Index at which last section of application id (with each section separated by _ in application id) will be split so that application znode stored in zookeeper RM state store will be stored as two different znodes (parent-child). Split is done from the end. +For instance, with no split, appid znode will be of the form application_1352994193343_0001. If the value of this config is 1, the appid znode will be broken into two parts application_1352994193343_000 and 1 respectively with former being the parent node. +application_1352994193343_0002 will then be stored as 2 under the parent node application_1352994193343_000. This config can take values from 0 to 4. 0 means there will be no split. If configuration value is outside this range, it will be treated as config value of 0(i.e. no split). A value +larger than 0 (up to 4) should be configured if you are storing a large number of apps in ZK based RM state store and state store operations are failing due to LenError in Zookeeper. + ### ON RMs: These are extra configurations that should appear in the **conf/yarn-site.xml** at each ResourceManager. -| Property | Example | Description | -|:---- |:---- |:---- | -|`yarn.resourcemanager.epoch` | `` | The seed value for the epoch. This is used to guarantee uniqueness of container-IDs generate by different RMs. It must therefore be unique among sub-clusters and `well-spaced` to allow for failures which increment epoch. Increments of 1000 allow for a large number of sub-clusters and practically ensure near-zero chance of collisions (a clash will only happen if a container is still alive for 1000 restarts of one RM, while the next RM never restarted, and an app requests more containers). | +| Property | Example | Description | +|:-----------------------------|:-----------------|:-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `yarn.resourcemanager.epoch` | `` | The seed value for the epoch. This is used to guarantee uniqueness of container-IDs generate by different RMs. It must therefore be unique among sub-clusters and `well-spaced` to allow for failures which increment epoch. Increments of 1000 allow for a large number of sub-clusters and practically ensure near-zero chance of collisions (a clash will only happen if a container is still alive for 1000 restarts of one RM, while the next RM never restarted, and an app requests more containers). | Optional: -| Property | Example | Description | -|:---- |:---- |:---- | -|`yarn.federation.state-store.heartbeat-interval-secs` | `60` | The rate at which RMs report their membership to the federation to the central state-store. | +| Property | Example | Description | +|:------------------------------------------------------|:--------|:--------------------------------------------------------------------------------------------| +| `yarn.federation.state-store.heartbeat-interval-secs` | `60` | The rate at which RMs report their membership to the federation to the central state-store. | +**How to configure the cleanup of applications** + +The Router supports storing scheduled applications in the StateStore. However, as the number of applications increases, it's essential to provide a mechanism for application cleanup. +We have implemented an automatic cleanup method in the ResourceManager (RM), which attempts to clean up the data in the StateStore after an application has completed its execution and has been removed from RM's memory. +Additionally, to account for certain exceptional cases where some applications may not be cleaned up properly, when the RM starts, we utilize a separate thread to attempt cleanup of completed applications. +We can refer to [YARN-11323](https://issues.apache.org/jira/browse/YARN-11323). + +| Property | Example | Description | +|:--------------------------------------------------------|:--------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `yarn.federation.state-store.clean-up-retry-count` | `1` | The number of retries to clear the app in the FederationStateStore, the default value is 1, that is, after the app fails to clean up, it will retry the cleanup again. | +| `yarn.federation.state-store.clean-up-retry-sleep-time` | `1s` | Clear the sleep time of App retry in FederationStateStore. When the app fails to clean up, it will sleep for a period of time and then try to clean up. The default value is 1s. | ### ON ROUTER: +#### How to select Router Mode + +Router supports YARN `Federation mode` and `Non-Federation` mode. + +- Non-YARN Federation mode + +the Router's role is to straightforwardly forward client requests to the cluster resourcemanager. In this mode, all we need to do is configure the cluster's ResourceManager addresses in the Router's **conf/yarn-site.xml**. + +- YARN Federation mode + +the Router distributes client requests to different sub-cluster resourcemanager based on user-configured queue policy. In this mode, we need to configure items such as interceptors, state store, and other settings. + These are extra configurations that should appear in the **conf/yarn-site.xml** at each Router. | Property | Example | Description | |:--------------------------------------------------|:----------------------------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | `yarn.router.bind-host` | `0.0.0.0` | Host IP to bind the router to. The actual address the server will bind to. If this optional address is set, the RPC and webapp servers will bind to this address and the port specified in yarn.router.*.address respectively. This is most useful for making Router listen to all interfaces by setting to 0.0.0.0. | | `yarn.router.clientrm.interceptor-class.pipeline` | `org.apache.hadoop.yarn.server.router.clientrm.FederationClientInterceptor` | A comma-separated list of interceptor classes to be run at the router when interfacing with the client. The last step of this pipeline must be the Federation Client Interceptor. | +| `yarn.router.rmadmin.interceptor-class.pipeline` | `org.apache.hadoop.yarn.server.router.rmadmin.FederationRMAdminInterceptor` | A comma-separated list of interceptor classes to be run at the router when interfacing with the client via Admin interface. The last step of this pipeline must be the Federation Admin Interceptor. | +| `yarn.router.webapp.interceptor-class.pipeline` | `org.apache.hadoop.yarn.server.router.webapp.FederationInterceptorREST` | A comma-separated list of interceptor classes to be run at the router when interfacing with the client via REST interface. The last step of this pipeline must be the Federation Interceptor REST. | -> Enable ApplicationSubmissionContextInterceptor +#### How to configure Router interceptor + +- yarn.router.clientrm.interceptor-class.pipeline + +The role of the interceptor is to forward client requests to RM. + +| Property | Mode | Description | +|:----------------------------------------------------------------------------------------|:-----------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `org.apache.hadoop.yarn.server.router.clientrm.DefaultClientRequestInterceptor` | `Non-Federation` | That simply forwards the client requests to the cluster resource manager. | +| `org.apache.hadoop.yarn.server.router.clientrm.PassThroughClientRequestInterceptor` | `Federation` | Interceptor that does not do anything other than forwarding it to the next Interceptor in the chain. | +| `org.apache.hadoop.yarn.server.router.clientrm.FederationClientInterceptor` | `Federation` | This Class provides an implementation for federation of YARN RM and scaling an application across multiple YARN SubClusters. All the federation specific implementation is encapsulated in this class. This is always the last interceptor in the chain. | +| `org.apache.hadoop.yarn.server.router.clientrm.ApplicationSubmissionContextInterceptor` | `Federation` | It prevents DoS attack over the ApplicationClientProtocol. Currently, it checks the size of the ApplicationSubmissionContext. If it exceeds the limit it can cause Zookeeper failures. | + +**How to configure the thread pool of FederationClientInterceptor** + +The FederationClientInterceptor retrieves data from multiple subClusters. To improve performance, we utilize a thread pool for concurrent access to multiple subClusters. +Below is the configuration for the thread pool, which can be adjusted based on the requirements of the production environment. + +| Property | Mode | Description | +|:----------------------------------------------------------------------|:--------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `yarn.router.interceptor.user-thread-pool.minimum-pool-size` | `5` | This configurable is used to set the corePoolSize(minimumPoolSize) of the thread pool of the interceptor. Default is 5. | +| `yarn.router.interceptor.user-thread-pool.maximum-pool-size` | `5` | This configuration is used to set the default value of maximumPoolSize of the thread pool of the interceptor. Default is 5. | +| `yarn.router.interceptor.user-thread-pool.keep-alive-time` | `0s` | This configurable is used to set the keepAliveTime of the thread pool of the interceptor. Default is 0s. | +| `yarn.router.interceptor.user-thread-pool.allow-core-thread-time-out` | `false` | This method configures the policy for core threads regarding termination when no tasks arrive within the keep-alive time. If set to true, We need to ensure that yarn.router.interceptor.user-thread-pool.keep-alive-time is greater than 0. | + +- yarn.router.rmadmin.interceptor-class.pipeline + +The role of the interceptor is to forward client's administrator requests to RM. + +| Property | Mode | Description | +|:--------------------------------------------------------------------------------|:-----------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------| +| `org.apache.hadoop.yarn.server.router.rmadmin.DefaultRMAdminRequestInterceptor` | `Non-Federation` | That simply forwards the client requests to the cluster resource manager. | +| `org.apache.hadoop.yarn.server.router.rmadmin.FederationRMAdminInterceptor` | `Federation` | Intercept the client's administrator request and forward it to the ResourceManager of Yarn SubClusters. This is always the last interceptor in the chain. | + +**How to configure the thread pool of FederationRMAdminInterceptor** + +The thread pool configuration used by the `FederationRMAdminInterceptor` is consistent with that of the `FederationClientInterceptor` and can be directly referenced for configuration. + +- yarn.router.webapp.interceptor-class.pipeline + +The role of the interceptor is to forward client Rest requests to RM. + +| Property | Mode | Description | +|:----------------------------------------------------------------------------|:-----------------|:-------------------------------------------------------------------------------------------------------------------------------------------------| +| `org.apache.hadoop.yarn.server.router.webapp.DefaultRequestInterceptorREST` | `Non-Federation` | That simply forwards the client requests to the cluster resource manager. | +| `org.apache.hadoop.yarn.server.router.webapp.FederationInterceptorREST` | `Federation` | Intercept the client's Rest request and forward it to the ResourceManager of Yarn SubClusters. This is always the last interceptor in the chain. | + +How to enable ApplicationSubmissionContextInterceptor - If the `FederationStateStore` is configured with `Zookpeer` storage, the app information will be stored in `Zookpeer`. If the size of the app information exceeds `1MB`, `Zookpeer` may fail. `ApplicationSubmissionContextInterceptor` will check the size of `ApplicationSubmissionContext`, if the size exceeds the limit(default 1MB), an exception will be thrown. - - The size of the ApplicationSubmissionContext of the application application_123456789_0001 is above the limit. Size = 1.02 MB. + +- The size of the ApplicationSubmissionContext of the application `application_123456789_0001` is above the limit. Size = 1.02 MB. - The required configuration is as follows: -``` +```xml yarn.router.clientrm.interceptor-class.pipeline org.apache.hadoop.yarn.server.router.clientrm.PassThroughClientRequestInterceptor, @@ -415,42 +502,40 @@ These are extra configurations that should appear in the **conf/yarn-site.xml** Optional: -| Property | Example | Description | -|:---- |:---- |:---- | -|`yarn.router.hostname` | `0.0.0.0` | Router host name. -|`yarn.router.clientrm.address` | `0.0.0.0:8050` | Router client address. | -|`yarn.router.webapp.address` | `0.0.0.0:8089` | Webapp address at the router. | -|`yarn.router.admin.address` | `0.0.0.0:8052` | Admin address at the router. | -|`yarn.router.webapp.https.address` | `0.0.0.0:8091` | Secure webapp address at the router. | -|`yarn.router.submit.retry` | `3` | The number of retries in the router before we give up. | -|`yarn.router.submit.interval.time` | `10ms` | The interval between two retry, the default value is 10ms. | -|`yarn.federation.statestore.max-connections` | `10` | This is the maximum number of parallel connections each Router makes to the state-store. | -|`yarn.federation.cache-ttl.secs` | `60` | The Router caches informations, and this is the time to leave before the cache is invalidated. | -|`yarn.federation.cache.class` | `org.apache.hadoop.yarn.server.federation.cache.FederationJCache` | The Router caches informations, We can configure the Cache implementation and the default implementation is FederationJCache.| -|`yarn.router.webapp.interceptor-class.pipeline` | `org.apache.hadoop.yarn.server.router.webapp.FederationInterceptorREST` | A comma-separated list of interceptor classes to be run at the router when interfacing with the client via REST interface. The last step of this pipeline must be the Federation Interceptor REST. | +| Property | Example | Description | +|:------------------------------------------------|:------------------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `yarn.router.hostname` | `0.0.0.0` | Router host name. | +| `yarn.router.clientrm.address` | `0.0.0.0:8050` | Router client address. | +| `yarn.router.webapp.address` | `0.0.0.0:8089` | Webapp address at the router. | +| `yarn.router.admin.address` | `0.0.0.0:8052` | Admin address at the router. | +| `yarn.router.webapp.https.address` | `0.0.0.0:8091` | Secure webapp address at the router. | +| `yarn.router.submit.retry` | `3` | The number of retries in the router before we give up. | +| `yarn.router.submit.interval.time` | `10ms` | The interval between two retry, the default value is 10ms. | +| `yarn.federation.cache-ttl.secs` | `300s` | The Router caches informations, and this is the time to leave before the cache is invalidated. | +| `yarn.federation.cache.class` | `org.apache.hadoop.yarn.server.federation.cache.FederationJCache` | The Router caches informations, We can configure the Cache implementation and the default implementation is FederationJCache. | -Security: +#### How to configure Router security Kerberos supported in federation. -| Property | Example | Description | -|:---- |:---- |:---- | -| `yarn.router.keytab.file` | | The keytab file used by router to login as its service principal. The principal name is configured with 'yarn.router.kerberos.principal'.| -| `yarn.router.kerberos.principal` | | The Router service principal. This is typically set to router/_HOST@REALM.TLD. Each Router will substitute _HOST with its own fully qualified hostname at startup. The _HOST placeholder allows using the same configuration setting on all Routers in setup. | -| `yarn.router.kerberos.principal.hostname` | | Optional. The hostname for the Router containing this configuration file. Will be different for each machine. Defaults to current hostname. | +| Property | Example | Description | +|:------------------------------------------|:--------|:--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `yarn.router.keytab.file` | | The keytab file used by router to login as its service principal. The principal name is configured with 'yarn.router.kerberos.principal'. | +| `yarn.router.kerberos.principal` | | The Router service principal. This is typically set to router/_HOST@REALM.TLD. Each Router will substitute _HOST with its own fully qualified hostname at startup. The _HOST placeholder allows using the same configuration setting on all Routers in setup. | +| `yarn.router.kerberos.principal.hostname` | | Optional. The hostname for the Router containing this configuration file. Will be different for each machine. Defaults to current hostname. | -Enabling CORS support: +#### How to configure Router Cors support To enable cross-origin support (CORS) for the Yarn Router, please set the following configuration parameters: -| Property | Example | Description | -| ----------------------------------------- | ------------------------------------------------------------ | ------------------------------------------------------------ | +| Property | Example | Description | +|-------------------------------------------|---------------------------------------------------------------|----------------------------------------------------------------------------------------------------------| | `hadoop.http.filter.initializers` | `org.apache.hadoop.security.HttpCrossOriginFilterInitializer` | Optional. Set the filter to HttpCrossOriginFilterInitializer, Configure this parameter in core-site.xml. | -| `yarn.router.webapp.cross-origin.enabled` | `true` | Optional. Enable/disable CORS filter.Configure this parameter in yarn-site.xml. | +| `yarn.router.webapp.cross-origin.enabled` | `true` | Optional. Enable/disable CORS filter.Configure this parameter in yarn-site.xml. | -Cache: +#### How to configure Router Cache -Cache is not enabled by default. When we set the `yarn.federation.cache-ttl.secs` parameter and its value is greater than 0, Cache will be enabled. +Cache is enabled by default. When we set the `yarn.federation.cache-ttl.secs` parameter and its value is greater than 0, Cache will be enabled. We currently provide two Cache implementations: `JCache` and `GuavaCache`. - JCache @@ -465,13 +550,69 @@ If we want to use JCache, we can configure `yarn.federation.cache.class` to `org This is a Cache implemented based on the Guava framework. If we want to use it, we can configure `yarn.federation.cache.class` to `org.apache.hadoop.yarn.server.federation.cache.FederationGuavaCache`. -Router command line: +#### How to configure Router AuditLog -- deregisterSubCluster +We can enable the AuditLog configuration for the Router and collect the AuditLog in a separate log file. We need to modify the configuration related to RouterAuditLog in the **conf/log4j.properties** file. + +The configuration is as follows: + +``` +router.audit.logger=INFO,ROUTERAUDIT +router.audit.log.maxfilesize=256MB +router.audit.log.maxbackupindex=20 +log4j.logger.org.apache.hadoop.yarn.server.router.RouterAuditLogger=${router.audit.logger} +log4j.additivity.org.apache.hadoop.yarn.server.router.RouterAuditLogger=false +log4j.appender.ROUTERAUDIT=org.apache.log4j.RollingFileAppender +log4j.appender.ROUTERAUDIT.File=${hadoop.log.dir}/router-audit.log +log4j.appender.ROUTERAUDIT.layout=org.apache.log4j.PatternLayout +log4j.appender.ROUTERAUDIT.layout.ConversionPattern=%d{ISO8601} %p %c{2}: %m%n +log4j.appender.ROUTERAUDIT.MaxFileSize=${router.audit.log.maxfilesize} +log4j.appender.ROUTERAUDIT.MaxBackupIndex=${router.audit.log.maxbackupindex} +``` + +#### How to configure Router Opts + +If we need to modify the `HEAPSIZE` or `OPTS` for the Router, we can make changes in the `yarn-env.sh` file. + +- YARN_ROUTER_HEAPSIZE + +``` +# Specify the max heapsize for the Router. If no units are given, it will be assumed to be in MB. +# Default is the same as HADOOP_HEAPSIZE_MAX +export YARN_ROUTER_HEAPSIZE= +``` + +- YARN_ROUTER_OPTS +``` +# Specify the JVM options to be used when starting the Router. These options will be appended to the options specified as HADOOP_OPTS +# and therefore may override any similar flags set in HADOOP_OPTS +export YARN_ROUTER_OPTS="-Drouter.audit.logger=INFO,ROUTERAUDIT" +``` + +#### How to configure the client to randomly Select a Router + +By default, the client will try from the first router in the configured router list. If the connection is successful, the router will not be replaced. We can set `yarn.federation.failover.random.order` to true to allow clients to randomly select Routers. + +#### How to configure the cleanup of expired subClusters + +We allow the Router to initiate a separate thread for periodically monitoring the status of all subClusters. If a subCluster's heartbeat exceeds a certain time threshold, we categorize that subCluster as "LOST". + +| Property | Example | Description | +|:---------------------------------------------------|:--------|:--------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `yarn.router.deregister.subcluster.enable` | `true` | Whether to enable the capability for automatic subCluster offline. Default is true. | +| `yarn.router.subcluster.heartbeat.expiration.time` | `30m` | The default subCluster heartbeat timeout time is 30 minutes. | +| `yarn.router.scheduled.executor.threads` | `1` | The number of threads started to check for subCluster timeouts has a default value of 1, and using the default value is sufficient for meeting the checking requirements. | +| `yarn.router.subcluster.cleaner.interval.time` | `60s` | The check thread's checking interval. Default 60s. | + +**Note** We don't need to configure the subCluster deregister checking threads for all Routers; using 1-2 Routers for checking is sufficient. + +#### How to use Router Command Line + +##### Cmd1: deregisterSubCluster This command is used to `deregister subCluster`, If the interval between the heartbeat time of the subCluster, and the current time exceeds the timeout period, set the state of the subCluster to `SC_LOST`. -Uasge: +Usage: `yarn routeradmin -deregisterSubCluster [-sc|--subClusterId ]` @@ -485,14 +626,16 @@ Examples: If we want to deregisterSubCluster `SC-1` -- yarn routeradmin -deregisterSubCluster -sc SC-1 -- yarn routeradmin -deregisterSubCluster --subClusterId SC-1 +``` + yarn routeradmin -deregisterSubCluster -sc SC-1 + yarn routeradmin -deregisterSubCluster --subClusterId SC-1 +``` -- policy +##### Cmd2: policy We provide a set of commands for Policy Include list policies, save policies, batch save policies. -Uasge: +Usage: `yarn routeradmin -policy -s|--save (queue;router weight;amrm weight;headroomalpha)` @@ -500,7 +643,7 @@ Uasge: `yarn routeradmin -policy -l|--list ([--pageSize][--currentPage][--queue][--queues])` -- -s|--save () +###### SubCmd1: -s|--save () This command is used to save the policy information of the queue, including queue and weight information. @@ -520,11 +663,13 @@ Example: We have two sub-clusters, `SC-1` and `SC-2`. We want to configure a weight policy for the `root.a` queue. The Router Weight is set to `SC-1` with a weight of `0.7` and `SC-2` with a weight of `0.3`. The AMRM Weight is set `SC-1` to `0.6` and `SC-2` to `0.4`. We are using the default value of `0.1` for `headroomalpha`. +``` yarn routeradmin -policy --save root.a;SC-1:0.7,SC-2:0.3;SC-1:0.6,SC-2:0.4;1.0 yarn routeradmin -policy -s root.a;SC-1:0.7,SC-2:0.3;SC-1:0.6,SC-2:0.4;1.0 +``` -- -bs|--batch-save (--format xml) (-f|--input-file fileName) +###### SubCmd2: -bs|--batch-save (--format xml) (-f|--input-file fileName) This command can batch load weight information for queues based on the provided `federation-weights.xml` file. @@ -534,7 +679,7 @@ This command can batch load weight information for queues based on the provided | `-f, --input-file [path]` | `The path to the configuration file. Please use the absolute path of the configuration file.` | How to configure `federation-weights.xml` - ```xml +```xml @@ -589,7 +734,7 @@ How to configure `federation-weights.xml` - ``` +``` Example: @@ -598,11 +743,13 @@ and then use the batch-save command to save the configurations in bulk. The file name can be any file name, but it is recommended to use `federation-weights.xml` +``` yarn routeradmin -policy -bs --format xml -f /path/federation-weights.xml yarn routeradmin -policy --batch-save --format xml -f /path/federation-weights.xml +``` -- -l|--list (--pageSize --currentPage --queue --queues) +###### SubCmd3: -l|--list (--pageSize --currentPage --queue --queues) This command is used to display the configured queue weight information. @@ -617,13 +764,15 @@ Example: We can display the list of already configured queue weight information. We can use the `--queue` option to query the weight information for a specific queue or use the `--queues` option to query the weight information for multiple queues. +``` yarn routeradmin -policy -l --pageSize 20 --currentPage 1 --queue root.a yarn routeradmin -policy -list --pageSize 20 --currentPage 1 --queues root.a,root.b +``` ### ON GPG: -GlobalPolicyGenerator, abbreviated as "GPG", is used for the automatic generation of global policies for subClusters. +GlobalPolicyGenerator, abbreviated as “GPG”, is used for the automatic generation of global policies for subClusters. The functionality of GPG is still under development and not yet complete. It is not recommended for use in a production environment. These are extra configurations that should appear in the **conf/yarn-site.xml** for GPG. We allow only one GPG. @@ -643,7 +792,7 @@ Optional: | `yarn.federation.gpg.policy.generator.blacklist` | | Which sub-clusters the policy generator should blacklist. | | `yarn.federation.gpg.policy.generator.load-based.pending.minimum` | `100` | The minimum number of pending applications in the subCluster. | | `yarn.federation.gpg.policy.generator.load-based.pending.maximum` | `1000` | The maximum number of pending applications in the subCluster. | -| `yarn.federation.gpg.policy.generator.load-based.weight.minimum` | `0` | If a subCluster has a very high load, we will assign this value to the subCluster. The default value is 0, which means that we no longer assign appliaction to this subCluster. | +| `yarn.federation.gpg.policy.generator.load-based.weight.minimum` | `0` | If a subCluster has a very high load, we will assign this value to the subCluster. The default value is 0, which means that we no longer assign application to this subCluster. | | `yarn.federation.gpg.policy.generator.load-based.edit.maximum` | `3` | This value represents the number of subClusters we want to calculate. default is 3. | | `yarn.federation.gpg.policy.generator.load-based.scaling` | `LINEAR` | We provide 4 calculation methods: NONE, LINEAR, QUADRATIC, LOG. | | `yarn.federation.gpg.webapp.address` | `0.0.0.0:8069` | The address of the GPG web application. | @@ -683,9 +832,10 @@ No calculation is required, and the weight is 1 at this time. LINEAR is used by default. -Security: +**Note** +It is not recommended to use GPG's capability to clean up expired applications in a production environment as this feature is still undergoing further development. -Kerberos supported in GPG. +#### How to configure GPG security | Property | Example | Description | |:--------------------------------------------------|:--------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| @@ -693,7 +843,7 @@ Kerberos supported in GPG. | `yarn.federation.gpg.kerberos.principal` | | The GPG service principal. This is typically set to GPG/_HOST@REALM.TLD. GPG will substitute _HOST with its own fully qualified hostname at startup. The _HOST placeholder allows using the same configuration setting on GPG in setup. | | `yarn.federation.gpg.kerberos.principal.hostname` | | Optional. The hostname for the GPG containing this configuration file. Will be different for each machine. Defaults to current hostname. | -Enabling CORS support: +#### How to configure GPG Cors support To enable cross-origin support (CORS) for the Yarn Router, please set the following configuration parameters: @@ -702,32 +852,52 @@ To enable cross-origin support (CORS) for the Yarn Router, please set the follow | `hadoop.http.filter.initializers` | `org.apache.hadoop.security.HttpCrossOriginFilterInitializer` | Optional. Set the filter to HttpCrossOriginFilterInitializer, Configure this parameter in core-site.xml. | | `yarn.federation.gpg.webapp.cross-origin.enabled` | `true` | Optional. Enable/disable CORS filter.Configure this parameter in yarn-site.xml. | +#### How to configure GPG Opts + +If we need to modify the `HEAPSIZE` or `OPTS` for the GPG, we can make changes in the `yarn-env.sh` file. + +- YARN_GLOBALPOLICYGENERATOR_HEAPSIZE +``` +# Specify the max heapsize for the Global Policy Generator. +# If no units are given, it will be assumed to be in MB. Default is the same as HADOOP_HEAPSIZE_MAX +# export YARN_GLOBALPOLICYGENERATOR_HEAPSIZE= +``` + +- YARN_GLOBALPOLICYGENERATOR_OPTS +``` +# Specify the JVM options to be used when starting the GPG. +# These options will be appended to the options specified as HADOOP_OPTS and therefore may override any similar flags set in HADOOP_OPTS +# +# See ResourceManager for some examples +# export YARN_GLOBALPOLICYGENERATOR_OPTS= +``` + ### ON NMs: These are extra configurations that should appear in the **conf/yarn-site.xml** at each NodeManager. -| Property | Example | Description | -|:---- |:---- |:---- | -| `yarn.nodemanager.amrmproxy.enabled` | `true` | Whether or not the AMRMProxy is enabled. | +| Property | Example | Description | +|:--------------------------------------------------------|:----------------------------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------| +| `yarn.nodemanager.amrmproxy.enabled` | `true` | Whether or not the AMRMProxy is enabled. | | `yarn.nodemanager.amrmproxy.interceptor-class.pipeline` | `org.apache.hadoop.yarn.server.nodemanager.amrmproxy.FederationInterceptor` | A comma-separated list of interceptors to be run at the amrmproxy. For federation the last step in the pipeline should be the FederationInterceptor. | Optional: -| Property | Example | Description | -|:---- |:---- |:---- | -| `yarn.nodemanager.amrmproxy.ha.enable` | `true` | Whether or not the AMRMProxy HA is enabled for multiple application attempt support. | -| `yarn.federation.statestore.max-connections` | `1` | The maximum number of parallel connections from each AMRMProxy to the state-store. This value is typically lower than the router one, since we have many AMRMProxy that could burn-through many DB connections quickly. | -| `yarn.federation.cache-ttl.secs` | `300` | The time to leave for the AMRMProxy cache. Typically larger than at the router, as the number of AMRMProxy is large, and we want to limit the load to the centralized state-store. | +| Property | Example | Description | +|:---------------------------------------------|:--------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `yarn.nodemanager.amrmproxy.ha.enable` | `true` | Whether or not the AMRMProxy HA is enabled for multiple application attempt support. | +| `yarn.federation.statestore.max-connections` | `1` | The maximum number of parallel connections from each AMRMProxy to the state-store. This value is typically lower than the router one, since we have many AMRMProxy that could burn-through many DB connections quickly. | +| `yarn.federation.cache-ttl.secs` | `300` | The time to leave for the AMRMProxy cache. Typically larger than at the router, as the number of AMRMProxy is large, and we want to limit the load to the centralized state-store. | Running a Sample Job -------------------- In order to submit jobs to a Federation cluster one must create a separate set of configs for the client from which jobs will be submitted. In these, the **conf/yarn-site.xml** should have the following additional configurations: -| Property | Example | Description | -|:--- |:--- |:---- | -| `yarn.resourcemanager.address` | `:8050` | Redirects jobs launched at the client to the router's client RM port. | -| `yarn.resourcemanager.scheduler.address` | `localhost:8049` | Redirects jobs to the federation AMRMProxy port.| +| Property | Example | Description | +|:-----------------------------------------|:---------------------|:----------------------------------------------------------------------| +| `yarn.resourcemanager.address` | `:8050` | Redirects jobs launched at the client to the router's client RM port. | +| `yarn.resourcemanager.scheduler.address` | `localhost:8049` | Redirects jobs to the federation AMRMProxy port. | Any YARN jobs for the cluster can be submitted from the client configurations described above. In order to launch a job through federation, first start up all the clusters involved in the federation as described [here](../../hadoop-project-dist/hadoop-common/ClusterSetup.html). Next, start up the router on the router machine with the following command: @@ -865,7 +1035,7 @@ $HADOOP_HOME/bin/yarn --daemon start resourcemanager true - + yarn.federation.failover.enabled false @@ -994,7 +1164,7 @@ $HADOOP_HOME/bin/yarn --daemon start router true - + yarn.federation.failover.enabled false