HADOOP-19156. ZooKeeper based state stores use different ZK address configs. (#6767). Contributed by liu bin.
Signed-off-by: Ayush Saxena <ayushsaxena@apache.org> Signed-off-by: He Xiaoqiao <hexiaoqiao@apache.org>
This commit is contained in:
parent
f4fde40524
commit
6c08e8e2aa
@ -126,7 +126,7 @@ public static List<ZKUtil.ZKAuthInfo> getZKAuths(Configuration conf)
|
|||||||
* Start the connection to the ZooKeeper ensemble.
|
* Start the connection to the ZooKeeper ensemble.
|
||||||
* @throws IOException If the connection cannot be started.
|
* @throws IOException If the connection cannot be started.
|
||||||
*/
|
*/
|
||||||
public void start() throws IOException{
|
public void start() throws IOException {
|
||||||
this.start(new ArrayList<>());
|
this.start(new ArrayList<>());
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -139,25 +139,47 @@ public void start(List<AuthInfo> authInfos) throws IOException {
|
|||||||
this.start(authInfos, false);
|
this.start(authInfos, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Start the connection to the ZooKeeper ensemble.
|
||||||
|
* @param zkHostPort Host:Port of the ZooKeeper.
|
||||||
|
* @throws IOException If the connection cannot be started.
|
||||||
|
*/
|
||||||
|
public void start(String zkHostPort) throws IOException {
|
||||||
|
this.start(new ArrayList<>(), false, zkHostPort);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Start the connection to the ZooKeeper ensemble.
|
||||||
|
* @param authInfos List of authentication keys.
|
||||||
|
* @param sslEnabled If the connection should be SSL/TLS encrypted.
|
||||||
|
* @throws IOException If the connection cannot be started.
|
||||||
|
*/
|
||||||
|
public void start(List<AuthInfo> authInfos, boolean sslEnabled) throws IOException {
|
||||||
|
this.start(authInfos, sslEnabled, null);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Start the connection to the ZooKeeper ensemble.
|
* Start the connection to the ZooKeeper ensemble.
|
||||||
*
|
*
|
||||||
* @param authInfos List of authentication keys.
|
* @param authInfos List of authentication keys.
|
||||||
* @param sslEnabled If the connection should be SSL/TLS encrypted.
|
* @param sslEnabled If the connection should be SSL/TLS encrypted.
|
||||||
|
* @param zkHostPort Host:Port of the ZooKeeper.
|
||||||
* @throws IOException If the connection cannot be started.
|
* @throws IOException If the connection cannot be started.
|
||||||
*/
|
*/
|
||||||
public void start(List<AuthInfo> authInfos, boolean sslEnabled)
|
public void start(List<AuthInfo> authInfos, boolean sslEnabled, String zkHostPort)
|
||||||
throws IOException{
|
throws IOException {
|
||||||
|
|
||||||
ZKClientConfig zkClientConfig = new ZKClientConfig();
|
ZKClientConfig zkClientConfig = new ZKClientConfig();
|
||||||
|
|
||||||
// Connect to the ZooKeeper ensemble
|
// Connect to the ZooKeeper ensemble
|
||||||
String zkHostPort = conf.get(CommonConfigurationKeys.ZK_ADDRESS);
|
|
||||||
if (zkHostPort == null) {
|
if (zkHostPort == null) {
|
||||||
throw new IOException(
|
zkHostPort = conf.get(CommonConfigurationKeys.ZK_ADDRESS);
|
||||||
CommonConfigurationKeys.ZK_ADDRESS + " is not configured.");
|
if (zkHostPort == null) {
|
||||||
|
throw new IOException(
|
||||||
|
CommonConfigurationKeys.ZK_ADDRESS + " is not configured.");
|
||||||
|
}
|
||||||
|
LOG.debug("Configured {} as {}", CommonConfigurationKeys.ZK_ADDRESS, zkHostPort);
|
||||||
}
|
}
|
||||||
LOG.debug("Configured {} as {}", CommonConfigurationKeys.ZK_ADDRESS, zkHostPort);
|
|
||||||
|
|
||||||
int numRetries = conf.getInt(CommonConfigurationKeys.ZK_NUM_RETRIES,
|
int numRetries = conf.getInt(CommonConfigurationKeys.ZK_NUM_RETRIES,
|
||||||
CommonConfigurationKeys.ZK_NUM_RETRIES_DEFAULT);
|
CommonConfigurationKeys.ZK_NUM_RETRIES_DEFAULT);
|
||||||
|
@ -71,9 +71,9 @@ public void setup() throws Exception {
|
|||||||
DELETE_DATA_DIRECTORY_ON_CLOSE, SERVER_ID, TICK_TIME, MAX_CLIENT_CNXNS,
|
DELETE_DATA_DIRECTORY_ON_CLOSE, SERVER_ID, TICK_TIME, MAX_CLIENT_CNXNS,
|
||||||
customConfiguration);
|
customConfiguration);
|
||||||
this.server = new TestingServer(spec, true);
|
this.server = new TestingServer(spec, true);
|
||||||
this.hadoopConf.set(CommonConfigurationKeys.ZK_ADDRESS, this.server.getConnectString());
|
String zkHostPort = this.server.getConnectString();
|
||||||
this.curator = new ZKCuratorManager(this.hadoopConf);
|
this.curator = new ZKCuratorManager(this.hadoopConf);
|
||||||
this.curator.start(new ArrayList<>(), true);
|
this.curator.start(new ArrayList<>(), true, zkHostPort);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -60,11 +60,10 @@ public void setup() throws Exception {
|
|||||||
this.server = new TestingServer();
|
this.server = new TestingServer();
|
||||||
|
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
conf.set(
|
String zkHostPort = this.server.getConnectString();
|
||||||
CommonConfigurationKeys.ZK_ADDRESS, this.server.getConnectString());
|
|
||||||
|
|
||||||
this.curator = new ZKCuratorManager(conf);
|
this.curator = new ZKCuratorManager(conf);
|
||||||
this.curator.start();
|
this.curator.start(zkHostPort);
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
|
@ -260,6 +260,8 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic {
|
|||||||
FEDERATION_STORE_ZK_DRIVER_PREFIX + "async.max.threads";
|
FEDERATION_STORE_ZK_DRIVER_PREFIX + "async.max.threads";
|
||||||
public static final int FEDERATION_STORE_ZK_ASYNC_MAX_THREADS_DEFAULT =
|
public static final int FEDERATION_STORE_ZK_ASYNC_MAX_THREADS_DEFAULT =
|
||||||
-1;
|
-1;
|
||||||
|
public static final String FEDERATION_STORE_ZK_ADDRESS =
|
||||||
|
FEDERATION_STORE_ZK_DRIVER_PREFIX + "address";
|
||||||
|
|
||||||
// HDFS Router-based federation File based store implementation specific configs
|
// HDFS Router-based federation File based store implementation specific configs
|
||||||
public static final String FEDERATION_STORE_FILE_ASYNC_THREADS =
|
public static final String FEDERATION_STORE_FILE_ASYNC_THREADS =
|
||||||
|
@ -108,9 +108,10 @@ public boolean initDriver() {
|
|||||||
} else {
|
} else {
|
||||||
LOG.info("Init StateStoreZookeeperImpl by sync mode.");
|
LOG.info("Init StateStoreZookeeperImpl by sync mode.");
|
||||||
}
|
}
|
||||||
|
String zkHostPort = conf.get(RBFConfigKeys.FEDERATION_STORE_ZK_ADDRESS);
|
||||||
try {
|
try {
|
||||||
this.zkManager = new ZKCuratorManager(conf);
|
this.zkManager = new ZKCuratorManager(conf);
|
||||||
this.zkManager.start();
|
this.zkManager.start(zkHostPort);
|
||||||
this.zkAcl = ZKCuratorManager.getZKAcls(conf);
|
this.zkAcl = ZKCuratorManager.getZKAcls(conf);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.error("Cannot initialize the ZK connection", e);
|
LOG.error("Cannot initialize the ZK connection", e);
|
||||||
|
@ -388,6 +388,13 @@
|
|||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>dfs.federation.router.store.driver.zk.address</name>
|
||||||
|
<description>
|
||||||
|
Host:Port of the ZooKeeper for StateStoreZooKeeperImpl.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>dfs.federation.router.store.driver.zk.parent-path</name>
|
<name>dfs.federation.router.store.driver.zk.parent-path</name>
|
||||||
<value>/hdfs-federation</value>
|
<value>/hdfs-federation</value>
|
||||||
|
@ -22,7 +22,6 @@
|
|||||||
import org.apache.curator.retry.RetryNTimes;
|
import org.apache.curator.retry.RetryNTimes;
|
||||||
import org.apache.curator.test.TestingServer;
|
import org.apache.curator.test.TestingServer;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
|
||||||
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
|
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
|
||||||
import org.apache.hadoop.hdfs.server.federation.store.RouterStore;
|
import org.apache.hadoop.hdfs.server.federation.store.RouterStore;
|
||||||
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
|
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
|
||||||
@ -74,7 +73,7 @@ public void setup() throws Exception {
|
|||||||
.retryPolicy(new RetryNTimes(100, 100))
|
.retryPolicy(new RetryNTimes(100, 100))
|
||||||
.build();
|
.build();
|
||||||
curatorFramework.start();
|
curatorFramework.start();
|
||||||
routerConfig.set(CommonConfigurationKeys.ZK_ADDRESS, connectStr);
|
routerConfig.set(RBFConfigKeys.FEDERATION_STORE_ZK_ADDRESS, connectStr);
|
||||||
router.init(routerConfig);
|
router.init(routerConfig);
|
||||||
router.start();
|
router.start();
|
||||||
|
|
||||||
|
@ -32,7 +32,6 @@
|
|||||||
|
|
||||||
import org.apache.curator.test.TestingServer;
|
import org.apache.curator.test.TestingServer;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
|
||||||
import org.apache.hadoop.hdfs.server.federation.FederationTestUtils;
|
import org.apache.hadoop.hdfs.server.federation.FederationTestUtils;
|
||||||
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
|
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
|
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
|
||||||
@ -81,7 +80,7 @@ public static void setUp() throws Exception {
|
|||||||
conf.setClass(RBFConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS,
|
conf.setClass(RBFConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS,
|
||||||
RBFConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS_DEFAULT,
|
RBFConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS_DEFAULT,
|
||||||
FileSubclusterResolver.class);
|
FileSubclusterResolver.class);
|
||||||
conf.set(CommonConfigurationKeys.ZK_ADDRESS, connectString);
|
conf.set(RBFConfigKeys.FEDERATION_STORE_ZK_ADDRESS, connectString);
|
||||||
conf.setBoolean(RBFConfigKeys.DFS_ROUTER_STORE_ENABLE, true);
|
conf.setBoolean(RBFConfigKeys.DFS_ROUTER_STORE_ENABLE, true);
|
||||||
cluster.addRouterOverrides(conf);
|
cluster.addRouterOverrides(conf);
|
||||||
cluster.startCluster();
|
cluster.startCluster();
|
||||||
|
@ -30,7 +30,6 @@
|
|||||||
|
|
||||||
import org.apache.curator.test.TestingServer;
|
import org.apache.curator.test.TestingServer;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
|
||||||
import org.apache.hadoop.hdfs.server.federation.FederationTestUtils;
|
import org.apache.hadoop.hdfs.server.federation.FederationTestUtils;
|
||||||
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
|
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
|
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
|
||||||
@ -86,7 +85,7 @@ public static void setUp() throws Exception {
|
|||||||
conf.setClass(RBFConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS,
|
conf.setClass(RBFConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS,
|
||||||
RBFConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS_DEFAULT,
|
RBFConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS_DEFAULT,
|
||||||
FileSubclusterResolver.class);
|
FileSubclusterResolver.class);
|
||||||
conf.set(CommonConfigurationKeys.ZK_ADDRESS, connectString);
|
conf.set(RBFConfigKeys.FEDERATION_STORE_ZK_ADDRESS, connectString);
|
||||||
conf.setBoolean(RBFConfigKeys.DFS_ROUTER_STORE_ENABLE, true);
|
conf.setBoolean(RBFConfigKeys.DFS_ROUTER_STORE_ENABLE, true);
|
||||||
cluster = new MiniRouterDFSCluster(false, numNameservices, conf);
|
cluster = new MiniRouterDFSCluster(false, numNameservices, conf);
|
||||||
cluster.addRouterOverrides(conf);
|
cluster.addRouterOverrides(conf);
|
||||||
|
@ -32,7 +32,6 @@
|
|||||||
import org.apache.curator.retry.RetryNTimes;
|
import org.apache.curator.retry.RetryNTimes;
|
||||||
import org.apache.curator.test.TestingServer;
|
import org.apache.curator.test.TestingServer;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
|
||||||
import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
|
import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils;
|
import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils;
|
||||||
import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreZooKeeperImpl;
|
import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreZooKeeperImpl;
|
||||||
@ -71,7 +70,7 @@ public static void setupCluster() throws Exception {
|
|||||||
// Create the ZK State Store
|
// Create the ZK State Store
|
||||||
Configuration conf =
|
Configuration conf =
|
||||||
getStateStoreConfiguration(StateStoreZooKeeperImpl.class);
|
getStateStoreConfiguration(StateStoreZooKeeperImpl.class);
|
||||||
conf.set(CommonConfigurationKeys.ZK_ADDRESS, connectString);
|
conf.set(RBFConfigKeys.FEDERATION_STORE_ZK_ADDRESS, connectString);
|
||||||
// Disable auto-repair of connection
|
// Disable auto-repair of connection
|
||||||
conf.setLong(RBFConfigKeys.FEDERATION_STORE_CONNECTION_TEST_MS,
|
conf.setLong(RBFConfigKeys.FEDERATION_STORE_CONNECTION_TEST_MS,
|
||||||
TimeUnit.HOURS.toMillis(1));
|
TimeUnit.HOURS.toMillis(1));
|
||||||
|
@ -112,8 +112,6 @@ private static void addDeprecatedKeys() {
|
|||||||
SYSTEM_METRICS_PUBLISHER_ENABLED),
|
SYSTEM_METRICS_PUBLISHER_ENABLED),
|
||||||
new DeprecationDelta(RM_ZK_ACL, CommonConfigurationKeys.ZK_ACL),
|
new DeprecationDelta(RM_ZK_ACL, CommonConfigurationKeys.ZK_ACL),
|
||||||
new DeprecationDelta(RM_ZK_AUTH, CommonConfigurationKeys.ZK_AUTH),
|
new DeprecationDelta(RM_ZK_AUTH, CommonConfigurationKeys.ZK_AUTH),
|
||||||
new DeprecationDelta(RM_ZK_ADDRESS,
|
|
||||||
CommonConfigurationKeys.ZK_ADDRESS),
|
|
||||||
new DeprecationDelta(RM_ZK_NUM_RETRIES,
|
new DeprecationDelta(RM_ZK_NUM_RETRIES,
|
||||||
CommonConfigurationKeys.ZK_NUM_RETRIES),
|
CommonConfigurationKeys.ZK_NUM_RETRIES),
|
||||||
new DeprecationDelta(RM_ZK_TIMEOUT_MS,
|
new DeprecationDelta(RM_ZK_TIMEOUT_MS,
|
||||||
@ -4038,6 +4036,9 @@ public static boolean isAclEnabled(Configuration conf) {
|
|||||||
public static final String DEFAULT_FEDERATION_STATESTORE_CLIENT_CLASS =
|
public static final String DEFAULT_FEDERATION_STATESTORE_CLIENT_CLASS =
|
||||||
"org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore";
|
"org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore";
|
||||||
|
|
||||||
|
public static final String FEDERATION_STATESTORE_ZK_ADDRESS =
|
||||||
|
FEDERATION_PREFIX + "state-store.zk.address";
|
||||||
|
|
||||||
public static final String FEDERATION_CACHE_TIME_TO_LIVE_SECS =
|
public static final String FEDERATION_CACHE_TIME_TO_LIVE_SECS =
|
||||||
FEDERATION_PREFIX + "cache-ttl.secs";
|
FEDERATION_PREFIX + "cache-ttl.secs";
|
||||||
// 5 minutes
|
// 5 minutes
|
||||||
|
@ -142,7 +142,6 @@ public void initializeMemberVariables() {
|
|||||||
.add(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED);
|
.add(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED);
|
||||||
|
|
||||||
// skip deprecated ZooKeeper settings
|
// skip deprecated ZooKeeper settings
|
||||||
configurationPropsToSkipCompare.add(YarnConfiguration.RM_ZK_ADDRESS);
|
|
||||||
configurationPropsToSkipCompare.add(YarnConfiguration.RM_ZK_NUM_RETRIES);
|
configurationPropsToSkipCompare.add(YarnConfiguration.RM_ZK_NUM_RETRIES);
|
||||||
configurationPropsToSkipCompare.add(YarnConfiguration.RM_ZK_TIMEOUT_MS);
|
configurationPropsToSkipCompare.add(YarnConfiguration.RM_ZK_TIMEOUT_MS);
|
||||||
configurationPropsToSkipCompare.add(
|
configurationPropsToSkipCompare.add(
|
||||||
|
@ -581,6 +581,15 @@
|
|||||||
<value>${yarn.resourcemanager.max-completed-applications}</value>
|
<value>${yarn.resourcemanager.max-completed-applications}</value>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<description>Host:Port of the ZooKeeper server to be used by the RM. This
|
||||||
|
must be supplied when using the ZooKeeper based implementation of the
|
||||||
|
RM state store and/or embedded automatic failover in an HA setting.
|
||||||
|
</description>
|
||||||
|
<name>yarn.resourcemanager.zk-address</name>
|
||||||
|
<!--value>127.0.0.1:2181</value-->
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<description>Full path of the ZooKeeper znode where RM state will be
|
<description>Full path of the ZooKeeper znode where RM state will be
|
||||||
stored. This must be supplied when using
|
stored. This must be supplied when using
|
||||||
@ -3798,6 +3807,13 @@
|
|||||||
<value>org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore</value>
|
<value>org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore</value>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<description>
|
||||||
|
Host:Port of the ZooKeeper server to be used by the federation state store
|
||||||
|
</description>
|
||||||
|
<name>yarn.federation.state-store.zk.address</name>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<description>
|
<description>
|
||||||
The time in seconds after which the federation state store local cache
|
The time in seconds after which the federation state store local cache
|
||||||
|
@ -265,9 +265,10 @@ public void init(Configuration conf) throws YarnException {
|
|||||||
baseZNode = conf.get(
|
baseZNode = conf.get(
|
||||||
YarnConfiguration.FEDERATION_STATESTORE_ZK_PARENT_PATH,
|
YarnConfiguration.FEDERATION_STATESTORE_ZK_PARENT_PATH,
|
||||||
YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_ZK_PARENT_PATH);
|
YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_ZK_PARENT_PATH);
|
||||||
|
String zkHostPort = conf.get(YarnConfiguration.FEDERATION_STATESTORE_ZK_ADDRESS);
|
||||||
try {
|
try {
|
||||||
this.zkManager = new ZKCuratorManager(conf);
|
this.zkManager = new ZKCuratorManager(conf);
|
||||||
this.zkManager.start();
|
this.zkManager.start(zkHostPort);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.error("Cannot initialize the ZK connection", e);
|
LOG.error("Cannot initialize the ZK connection", e);
|
||||||
}
|
}
|
||||||
|
@ -27,7 +27,6 @@
|
|||||||
import org.apache.curator.retry.RetryNTimes;
|
import org.apache.curator.retry.RetryNTimes;
|
||||||
import org.apache.curator.test.TestingServer;
|
import org.apache.curator.test.TestingServer;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
|
||||||
import org.apache.hadoop.metrics2.MetricsRecord;
|
import org.apache.hadoop.metrics2.MetricsRecord;
|
||||||
import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl;
|
import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl;
|
||||||
import org.apache.hadoop.metrics2.impl.MetricsRecords;
|
import org.apache.hadoop.metrics2.impl.MetricsRecords;
|
||||||
@ -94,7 +93,7 @@ public void before() throws IOException, YarnException {
|
|||||||
curatorFramework.start();
|
curatorFramework.start();
|
||||||
|
|
||||||
Configuration conf = new YarnConfiguration();
|
Configuration conf = new YarnConfiguration();
|
||||||
conf.set(CommonConfigurationKeys.ZK_ADDRESS, connectString);
|
conf.set(YarnConfiguration.FEDERATION_STATESTORE_ZK_ADDRESS, connectString);
|
||||||
conf.setInt(YarnConfiguration.FEDERATION_STATESTORE_MAX_APPLICATIONS, 10);
|
conf.setInt(YarnConfiguration.FEDERATION_STATESTORE_MAX_APPLICATIONS, 10);
|
||||||
setConf(conf);
|
setConf(conf);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
@ -410,6 +410,7 @@ protected EmbeddedElector createEmbeddedElector() throws IOException {
|
|||||||
*/
|
*/
|
||||||
public ZKCuratorManager createAndStartZKManager(Configuration
|
public ZKCuratorManager createAndStartZKManager(Configuration
|
||||||
config) throws IOException {
|
config) throws IOException {
|
||||||
|
String zkHostPort = config.get(YarnConfiguration.RM_ZK_ADDRESS);
|
||||||
ZKCuratorManager manager = new ZKCuratorManager(config);
|
ZKCuratorManager manager = new ZKCuratorManager(config);
|
||||||
|
|
||||||
// Get authentication
|
// Get authentication
|
||||||
@ -432,7 +433,7 @@ public ZKCuratorManager createAndStartZKManager(Configuration
|
|||||||
config.getBoolean(CommonConfigurationKeys.ZK_CLIENT_SSL_ENABLED,
|
config.getBoolean(CommonConfigurationKeys.ZK_CLIENT_SSL_ENABLED,
|
||||||
config.getBoolean(YarnConfiguration.RM_ZK_CLIENT_SSL_ENABLED,
|
config.getBoolean(YarnConfiguration.RM_ZK_CLIENT_SSL_ENABLED,
|
||||||
YarnConfiguration.DEFAULT_RM_ZK_CLIENT_SSL_ENABLED));
|
YarnConfiguration.DEFAULT_RM_ZK_CLIENT_SSL_ENABLED));
|
||||||
manager.start(authInfos, isSSLEnabled);
|
manager.start(authInfos, isSSLEnabled, zkHostPort);
|
||||||
return manager;
|
return manager;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -64,7 +64,7 @@ public void setup() throws Exception {
|
|||||||
configuration.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
|
configuration.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
|
||||||
configuration.set(YarnConfiguration.RM_STORE,
|
configuration.set(YarnConfiguration.RM_STORE,
|
||||||
ZKRMStateStore.class.getName());
|
ZKRMStateStore.class.getName());
|
||||||
configuration.set(CommonConfigurationKeys.ZK_ADDRESS, hostPort);
|
configuration.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort);
|
||||||
configuration.setInt(CommonConfigurationKeys.ZK_TIMEOUT_MS, ZK_TIMEOUT_MS);
|
configuration.setInt(CommonConfigurationKeys.ZK_TIMEOUT_MS, ZK_TIMEOUT_MS);
|
||||||
configuration.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
|
configuration.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
|
||||||
configuration.set(YarnConfiguration.RM_CLUSTER_ID, "test-yarn-cluster");
|
configuration.set(YarnConfiguration.RM_CLUSTER_ID, "test-yarn-cluster");
|
||||||
|
@ -213,7 +213,7 @@ private String getDelegationTokenNode(int rmDTSequenceNumber, int splitIdx) {
|
|||||||
|
|
||||||
private RMStateStore createStore(Configuration conf) throws Exception {
|
private RMStateStore createStore(Configuration conf) throws Exception {
|
||||||
workingZnode = "/jira/issue/3077/rmstore";
|
workingZnode = "/jira/issue/3077/rmstore";
|
||||||
conf.set(CommonConfigurationKeys.ZK_ADDRESS,
|
conf.set(YarnConfiguration.RM_ZK_ADDRESS,
|
||||||
curatorTestingServer.getConnectString());
|
curatorTestingServer.getConnectString());
|
||||||
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
|
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
|
||||||
conf.setLong(YarnConfiguration.RM_EPOCH, epoch);
|
conf.setLong(YarnConfiguration.RM_EPOCH, epoch);
|
||||||
@ -347,7 +347,7 @@ public Version getCurrentVersion() throws Exception {
|
|||||||
public RMStateStore getRMStateStore() throws Exception {
|
public RMStateStore getRMStateStore() throws Exception {
|
||||||
YarnConfiguration conf = new YarnConfiguration();
|
YarnConfiguration conf = new YarnConfiguration();
|
||||||
workingZnode = "/jira/issue/3077/rmstore";
|
workingZnode = "/jira/issue/3077/rmstore";
|
||||||
conf.set(CommonConfigurationKeys.ZK_ADDRESS,
|
conf.set(YarnConfiguration.RM_ZK_ADDRESS,
|
||||||
curatorTestingServer.getConnectString());
|
curatorTestingServer.getConnectString());
|
||||||
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
|
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
|
||||||
this.store = new TestZKRMStateStoreInternal(conf, workingZnode) {
|
this.store = new TestZKRMStateStoreInternal(conf, workingZnode) {
|
||||||
@ -388,7 +388,7 @@ public static Configuration createHARMConf(String rmIds, String rmId,
|
|||||||
conf.set(YarnConfiguration.RM_HA_IDS, rmIds);
|
conf.set(YarnConfiguration.RM_HA_IDS, rmIds);
|
||||||
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
|
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
|
||||||
conf.set(YarnConfiguration.RM_STORE, ZKRMStateStore.class.getName());
|
conf.set(YarnConfiguration.RM_STORE, ZKRMStateStore.class.getName());
|
||||||
conf.set(CommonConfigurationKeys.ZK_ADDRESS,
|
conf.set(YarnConfiguration.RM_ZK_ADDRESS,
|
||||||
curatorTestServer.getConnectString());
|
curatorTestServer.getConnectString());
|
||||||
conf.setInt(CommonConfigurationKeys.ZK_TIMEOUT_MS, ZK_TIMEOUT_MS);
|
conf.setInt(CommonConfigurationKeys.ZK_TIMEOUT_MS, ZK_TIMEOUT_MS);
|
||||||
conf.set(YarnConfiguration.RM_HA_ID, rmId);
|
conf.set(YarnConfiguration.RM_HA_ID, rmId);
|
||||||
|
@ -88,7 +88,7 @@ public TestZKRMStateStore(Configuration conf, String workingZnode)
|
|||||||
|
|
||||||
public RMStateStore getRMStateStore(Configuration conf) throws Exception {
|
public RMStateStore getRMStateStore(Configuration conf) throws Exception {
|
||||||
String workingZnode = "/Test";
|
String workingZnode = "/Test";
|
||||||
conf.set(CommonConfigurationKeys.ZK_ADDRESS,
|
conf.set(YarnConfiguration.RM_ZK_ADDRESS,
|
||||||
testingServer.getConnectString());
|
testingServer.getConnectString());
|
||||||
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
|
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
|
||||||
this.store = new TestZKRMStateStore(conf, workingZnode);
|
this.store = new TestZKRMStateStore(conf, workingZnode);
|
||||||
|
@ -23,7 +23,6 @@
|
|||||||
import org.apache.curator.retry.RetryNTimes;
|
import org.apache.curator.retry.RetryNTimes;
|
||||||
import org.apache.curator.test.TestingServer;
|
import org.apache.curator.test.TestingServer;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
|
||||||
import org.apache.hadoop.ha.HAServiceProtocol;
|
import org.apache.hadoop.ha.HAServiceProtocol;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.service.Service;
|
import org.apache.hadoop.service.Service;
|
||||||
@ -100,7 +99,7 @@ public void setUp() throws Exception {
|
|||||||
curatorTestingServer = setupCuratorServer();
|
curatorTestingServer = setupCuratorServer();
|
||||||
curatorFramework = setupCuratorFramework(curatorTestingServer);
|
curatorFramework = setupCuratorFramework(curatorTestingServer);
|
||||||
|
|
||||||
conf.set(CommonConfigurationKeys.ZK_ADDRESS,
|
conf.set(YarnConfiguration.RM_ZK_ADDRESS,
|
||||||
curatorTestingServer.getConnectString());
|
curatorTestingServer.getConnectString());
|
||||||
rm = new MockRM(conf);
|
rm = new MockRM(conf);
|
||||||
rm.start();
|
rm.start();
|
||||||
|
@ -177,7 +177,7 @@ ZooKeeper: one must set the ZooKeeper settings for Hadoop:
|
|||||||
| Property | Example | Description |
|
| Property | Example | Description |
|
||||||
|:------------------------------------|:------------------------------------------------------------------------------------|:----------------------------------------|
|
|:------------------------------------|:------------------------------------------------------------------------------------|:----------------------------------------|
|
||||||
| `yarn.federation.state-store.class` | `org.apache.hadoop.yarn.server.federation.store.impl.ZookeeperFederationStateStore` | The type of state-store to use. |
|
| `yarn.federation.state-store.class` | `org.apache.hadoop.yarn.server.federation.store.impl.ZookeeperFederationStateStore` | The type of state-store to use. |
|
||||||
| `hadoop.zk.address` | `host:port` | The address for the ZooKeeper ensemble. |
|
| `yarn.federation.state-store.zk.address` | `host:port` | The address for the ZooKeeper ensemble. |
|
||||||
|
|
||||||
SQL: one must setup the following parameters:
|
SQL: one must setup the following parameters:
|
||||||
|
|
||||||
@ -1006,7 +1006,7 @@ Example of Machine-Role Mapping(Exclude HDFS):
|
|||||||
|
|
||||||
<!-- ZK Address. -->
|
<!-- ZK Address. -->
|
||||||
<property>
|
<property>
|
||||||
<name>hadoop.zk.address</name>
|
<name>yarn.federation.state-store.zk.address</name>
|
||||||
<value>zkHost:zkPort</value>
|
<value>zkHost:zkPort</value>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
@ -1067,7 +1067,7 @@ $HADOOP_HOME/bin/yarn --daemon start resourcemanager
|
|||||||
|
|
||||||
<!-- ZK Address. -->
|
<!-- ZK Address. -->
|
||||||
<property>
|
<property>
|
||||||
<name>hadoop.zk.address</name>
|
<name>yarn.federation.state-store.zk.address</name>
|
||||||
<value>zkHost:zkPort</value>
|
<value>zkHost:zkPort</value>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
@ -1135,7 +1135,7 @@ After we have finished configuring the `YARN-2` cluster, we can proceed with sta
|
|||||||
|
|
||||||
<!-- ZK Address. -->
|
<!-- ZK Address. -->
|
||||||
<property>
|
<property>
|
||||||
<name>hadoop.zk.address</name>
|
<name>yarn.federation.state-store.zk.address</name>
|
||||||
<value>zkHost:zkPort</value>
|
<value>zkHost:zkPort</value>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
@ -56,7 +56,7 @@ Most of the failover functionality is tunable using various configuration proper
|
|||||||
|
|
||||||
| Configuration Properties | Description |
|
| Configuration Properties | Description |
|
||||||
|:---- |:---- |
|
|:---- |:---- |
|
||||||
| `hadoop.zk.address` | Address of the ZK-quorum. Used both for the state-store and embedded leader-election. |
|
| `yarn.resourcemanager.zk-address` | Address of the ZK-quorum. Used both for the state-store and embedded leader-election. |
|
||||||
| `yarn.resourcemanager.ha.enabled` | Enable RM HA. |
|
| `yarn.resourcemanager.ha.enabled` | Enable RM HA. |
|
||||||
| `yarn.resourcemanager.ha.rm-ids` | List of logical IDs for the RMs. e.g., "rm1,rm2". |
|
| `yarn.resourcemanager.ha.rm-ids` | List of logical IDs for the RMs. e.g., "rm1,rm2". |
|
||||||
| `yarn.resourcemanager.hostname.`*rm-id* | For each *rm-id*, specify the hostname the RM corresponds to. Alternately, one could set each of the RM's service addresses. |
|
| `yarn.resourcemanager.hostname.`*rm-id* | For each *rm-id*, specify the hostname the RM corresponds to. Alternately, one could set each of the RM's service addresses. |
|
||||||
@ -112,7 +112,7 @@ Here is the sample of minimal setup for RM failover.
|
|||||||
<value>master2:8088</value>
|
<value>master2:8088</value>
|
||||||
</property>
|
</property>
|
||||||
<property>
|
<property>
|
||||||
<name>hadoop.zk.address</name>
|
<name>yarn.resourcemanager.zk-address</name>
|
||||||
<value>zk1:2181,zk2:2181,zk3:2181</value>
|
<value>zk1:2181,zk2:2181,zk3:2181</value>
|
||||||
</property>
|
</property>
|
||||||
```
|
```
|
||||||
|
@ -93,7 +93,7 @@ This section describes the configurations involved to enable RM Restart feature.
|
|||||||
|
|
||||||
| Property | Description |
|
| Property | Description |
|
||||||
|:---- |:---- |
|
|:---- |:---- |
|
||||||
| `hadoop.zk.address` | Comma separated list of Host:Port pairs. Each corresponds to a ZooKeeper server (e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002") to be used by the RM for storing RM state. |
|
| `yarn.resourcemanager.zk-address` | Comma separated list of Host:Port pairs. Each corresponds to a ZooKeeper server (e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002") to be used by the RM for storing RM state. |
|
||||||
| `yarn.resourcemanager.zk-state-store.parent-path` | The full path of the root znode where RM state will be stored. Default value is /rmstore. |
|
| `yarn.resourcemanager.zk-state-store.parent-path` | The full path of the root znode where RM state will be stored. Default value is /rmstore. |
|
||||||
|
|
||||||
* Configure the retry policy state-store client uses to connect with the ZooKeeper server.
|
* Configure the retry policy state-store client uses to connect with the ZooKeeper server.
|
||||||
@ -157,7 +157,7 @@ Below is a minimum set of configurations for enabling RM work-preserving restart
|
|||||||
(e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002") to be used by the RM for storing RM state.
|
(e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002") to be used by the RM for storing RM state.
|
||||||
This must be supplied when using org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore
|
This must be supplied when using org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore
|
||||||
as the value for yarn.resourcemanager.store.class</description>
|
as the value for yarn.resourcemanager.store.class</description>
|
||||||
<name>hadoop.zk.address</name>
|
<name>yarn.resourcemanager.zk-address</name>
|
||||||
<value>127.0.0.1:2181</value>
|
<value>127.0.0.1:2181</value>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user