YARN-1528. Allow setting auth for ZK connections. (kasha)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1573014 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
9bdd272fd7
commit
1ba26e3e68
@ -241,6 +241,8 @@ Release 2.4.0 - UNRELEASED
|
||||
YARN-1301. Added the INFO level log of the non-empty blacklist additions
|
||||
and removals inside ApplicationMasterService. (Tsuyoshi Ozawa via zjshen)
|
||||
|
||||
YARN-1528. Allow setting auth for ZK connections. (kasha)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
@ -335,6 +335,8 @@ public class YarnConfiguration extends Configuration {
|
||||
public static final String RM_ZK_ACL = RM_ZK_PREFIX + "acl";
|
||||
public static final String DEFAULT_RM_ZK_ACL = "world:anyone:rwcda";
|
||||
|
||||
public static final String RM_ZK_AUTH = RM_ZK_PREFIX + "auth";
|
||||
|
||||
public static final String ZK_STATE_STORE_PREFIX =
|
||||
RM_PREFIX + "zk-state-store.";
|
||||
|
||||
|
@ -31,14 +31,12 @@
|
||||
import org.apache.hadoop.util.ZKUtil;
|
||||
import org.apache.hadoop.yarn.conf.HAUtil;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.apache.zookeeper.data.ACL;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
@ -88,18 +86,8 @@ protected synchronized void serviceInit(Configuration conf)
|
||||
long zkSessionTimeout = conf.getLong(YarnConfiguration.RM_ZK_TIMEOUT_MS,
|
||||
YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
|
||||
|
||||
String zkAclConf = conf.get(YarnConfiguration.RM_ZK_ACL,
|
||||
YarnConfiguration.DEFAULT_RM_ZK_ACL);
|
||||
List<ACL> zkAcls;
|
||||
try {
|
||||
zkAcls = ZKUtil.parseACLs(ZKUtil.resolveConfIndirection(zkAclConf));
|
||||
} catch (ZKUtil.BadAclFormatException bafe) {
|
||||
throw new YarnRuntimeException(
|
||||
YarnConfiguration.RM_ZK_ACL + "has ill-formatted ACLs");
|
||||
}
|
||||
|
||||
// TODO (YARN-1528): ZKAuthInfo to be set for rm-store and elector
|
||||
List<ZKUtil.ZKAuthInfo> zkAuths = Collections.emptyList();
|
||||
List<ACL> zkAcls = RMZKUtils.getZKAcls(conf);
|
||||
List<ZKUtil.ZKAuthInfo> zkAuths = RMZKUtils.getZKAuths(conf);
|
||||
|
||||
elector = new ActiveStandbyElector(zkQuorum, (int) zkSessionTimeout,
|
||||
electionZNode, zkAcls, zkAuths, this);
|
||||
|
@ -0,0 +1,74 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.util.ZKUtil;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.zookeeper.data.ACL;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Helper class that provides utility methods specific to ZK operations
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class RMZKUtils {
|
||||
private static final Log LOG = LogFactory.getLog(RMZKUtils.class);
|
||||
|
||||
/**
|
||||
* Utility method to fetch the ZK ACLs from the configuration
|
||||
*/
|
||||
public static List<ACL> getZKAcls(Configuration conf) throws Exception {
|
||||
// Parse authentication from configuration.
|
||||
String zkAclConf =
|
||||
conf.get(YarnConfiguration.RM_ZK_ACL,
|
||||
YarnConfiguration.DEFAULT_RM_ZK_ACL);
|
||||
try {
|
||||
zkAclConf = ZKUtil.resolveConfIndirection(zkAclConf);
|
||||
return ZKUtil.parseACLs(zkAclConf);
|
||||
} catch (Exception e) {
|
||||
LOG.error("Couldn't read ACLs based on " + YarnConfiguration.RM_ZK_ACL);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Utility method to fetch ZK auth info from the configuration
|
||||
*/
|
||||
public static List<ZKUtil.ZKAuthInfo> getZKAuths(Configuration conf)
|
||||
throws Exception {
|
||||
String zkAuthConf = conf.get(YarnConfiguration.RM_ZK_AUTH);
|
||||
try {
|
||||
zkAuthConf = ZKUtil.resolveConfIndirection(zkAuthConf);
|
||||
if (zkAuthConf != null) {
|
||||
return ZKUtil.parseAuth(zkAuthConf);
|
||||
} else {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.error("Couldn't read Auth based on " + YarnConfiguration.RM_ZK_AUTH);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
@ -48,6 +48,7 @@
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMStateVersionProto;
|
||||
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMZKUtils;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
|
||||
@ -91,6 +92,7 @@ public class ZKRMStateStore extends RMStateStore {
|
||||
private int zkSessionTimeout;
|
||||
private long zkRetryInterval;
|
||||
private List<ACL> zkAcl;
|
||||
private List<ZKUtil.ZKAuthInfo> zkAuths;
|
||||
|
||||
/**
|
||||
*
|
||||
@ -200,18 +202,9 @@ public synchronized void initInternal(Configuration conf) throws Exception {
|
||||
zkRetryInterval =
|
||||
conf.getLong(YarnConfiguration.RM_ZK_RETRY_INTERVAL_MS,
|
||||
YarnConfiguration.DEFAULT_RM_ZK_RETRY_INTERVAL_MS);
|
||||
// Parse authentication from configuration.
|
||||
String zkAclConf =
|
||||
conf.get(YarnConfiguration.RM_ZK_ACL,
|
||||
YarnConfiguration.DEFAULT_RM_ZK_ACL);
|
||||
zkAclConf = ZKUtil.resolveConfIndirection(zkAclConf);
|
||||
|
||||
try {
|
||||
zkAcl = ZKUtil.parseACLs(zkAclConf);
|
||||
} catch (ZKUtil.BadAclFormatException bafe) {
|
||||
LOG.error("Invalid format for " + YarnConfiguration.RM_ZK_ACL);
|
||||
throw bafe;
|
||||
}
|
||||
zkAcl = RMZKUtils.getZKAcls(conf);
|
||||
zkAuths = RMZKUtils.getZKAuths(conf);
|
||||
|
||||
zkRootNodePath = getNodePath(znodeWorkingPath, ROOT_ZNODE_NAME);
|
||||
rmAppRoot = getNodePath(zkRootNodePath, RM_APP_ROOT);
|
||||
@ -952,6 +945,9 @@ private synchronized void createConnection()
|
||||
retries++) {
|
||||
try {
|
||||
zkClient = getNewZooKeeper();
|
||||
for (ZKUtil.ZKAuthInfo zkAuth : zkAuths) {
|
||||
zkClient.addAuthInfo(zkAuth.getScheme(), zkAuth.getAuth());
|
||||
}
|
||||
if (useDefaultFencingScheme) {
|
||||
zkClient.addAuthInfo(zkRootNodeAuthScheme,
|
||||
(zkRootNodeUsername + ":" + zkRootNodePassword).getBytes());
|
||||
|
@ -32,10 +32,12 @@
|
||||
import org.apache.zookeeper.Watcher;
|
||||
import org.apache.zookeeper.ZooDefs;
|
||||
import org.apache.zookeeper.ZooKeeper;
|
||||
import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.util.concurrent.CyclicBarrier;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
@ -49,6 +51,20 @@ public class TestZKRMStateStoreZKClientConnections extends
|
||||
private Log LOG =
|
||||
LogFactory.getLog(TestZKRMStateStoreZKClientConnections.class);
|
||||
|
||||
private static final String DIGEST_USER_PASS="test-user:test-password";
|
||||
private static final String TEST_AUTH_GOOD = "digest:" + DIGEST_USER_PASS;
|
||||
private static final String DIGEST_USER_HASH;
|
||||
static {
|
||||
try {
|
||||
DIGEST_USER_HASH = DigestAuthenticationProvider.generateDigest(
|
||||
DIGEST_USER_PASS);
|
||||
} catch (NoSuchAlgorithmException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
private static final String TEST_ACL = "digest:" + DIGEST_USER_HASH + ":rwcda";
|
||||
|
||||
|
||||
class TestZKClient {
|
||||
|
||||
ZKRMStateStore store;
|
||||
@ -252,4 +268,16 @@ public void testInvalidZKAclConfiguration() {
|
||||
fail(error);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testZKAuths() throws Exception {
|
||||
TestZKClient zkClientTester = new TestZKClient();
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
conf.setInt(YarnConfiguration.RM_ZK_NUM_RETRIES, 1);
|
||||
conf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, 100);
|
||||
conf.set(YarnConfiguration.RM_ZK_ACL, TEST_ACL);
|
||||
conf.set(YarnConfiguration.RM_ZK_AUTH, TEST_AUTH_GOOD);
|
||||
|
||||
zkClientTester.getRMStateStore(conf);
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user