HADOOP-8215. Security support for ZK Failover controller. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-3042@1309185 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2012-04-03 23:37:15 +00:00
parent 39a5fd5c71
commit 30e1b3bba8
12 changed files with 472 additions and 26 deletions

View File

@ -7,3 +7,5 @@ branch is merged.
HADOOP-8220. ZKFailoverController doesn't handle failure to become active correctly (todd)
HADOOP-8228. Auto HA: Refactor tests and add stress tests. (todd)
HADOOP-8215. Security support for ZK Failover controller (todd)

View File

@ -29,6 +29,7 @@
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.ha.HAZKUtil.ZKAuthInfo;
import org.apache.hadoop.util.StringUtils;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.KeeperException;
@ -161,6 +162,7 @@ static enum State {
private final String zkHostPort;
private final int zkSessionTimeout;
private final List<ACL> zkAcl;
private final List<ZKAuthInfo> zkAuthInfo;
private byte[] appData;
private final String zkLockFilePath;
private final String zkBreadCrumbPath;
@ -192,6 +194,8 @@ static enum State {
* znode under which to create the lock
* @param acl
* ZooKeeper ACL's
* @param authInfo a list of authentication credentials to add to the
* ZK connection
* @param app
* reference to callback interface object
* @throws IOException
@ -199,6 +203,7 @@ static enum State {
*/
public ActiveStandbyElector(String zookeeperHostPorts,
int zookeeperSessionTimeout, String parentZnodeName, List<ACL> acl,
List<ZKAuthInfo> authInfo,
ActiveStandbyElectorCallback app) throws IOException,
HadoopIllegalArgumentException {
if (app == null || acl == null || parentZnodeName == null
@ -208,6 +213,7 @@ public ActiveStandbyElector(String zookeeperHostPorts,
zkHostPort = zookeeperHostPorts;
zkSessionTimeout = zookeeperSessionTimeout;
zkAcl = acl;
zkAuthInfo = authInfo;
appClient = app;
znodeWorkingDir = parentZnodeName;
zkLockFilePath = znodeWorkingDir + "/" + LOCK_FILENAME;
@ -587,6 +593,9 @@ synchronized void processWatchEvent(ZooKeeper zk, WatchedEvent event) {
protected synchronized ZooKeeper getNewZooKeeper() throws IOException {
ZooKeeper zk = new ZooKeeper(zkHostPort, zkSessionTimeout, null);
zk.register(new WatcherWithClientRef(zk));
for (ZKAuthInfo auth : zkAuthInfo) {
zk.addAuthInfo(auth.getScheme(), auth.getAuth());
}
return zk;
}

View File

@ -0,0 +1,199 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ha;
import java.io.File;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import com.google.common.base.Charsets;
import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
import com.google.common.io.Files;
/**
* Utilities for working with ZooKeeper.
*/
@InterfaceAudience.Private
public class HAZKUtil {
/**
* Parse ACL permission string, partially borrowed from
* ZooKeeperMain private method
*/
private static int getPermFromString(String permString) {
int perm = 0;
for (int i = 0; i < permString.length(); i++) {
char c = permString.charAt(i);
switch (c) {
case 'r':
perm |= ZooDefs.Perms.READ;
break;
case 'w':
perm |= ZooDefs.Perms.WRITE;
break;
case 'c':
perm |= ZooDefs.Perms.CREATE;
break;
case 'd':
perm |= ZooDefs.Perms.DELETE;
break;
case 'a':
perm |= ZooDefs.Perms.ADMIN;
break;
default:
throw new BadAclFormatException(
"Invalid permission '" + c + "' in permission string '" +
permString + "'");
}
}
return perm;
}
/**
* Parse comma separated list of ACL entries to secure generated nodes, e.g.
* <code>sasl:hdfs/host1@MY.DOMAIN:cdrwa,sasl:hdfs/host2@MY.DOMAIN:cdrwa</code>
*
* @return ACL list
* @throws HadoopIllegalArgumentException if an ACL is invalid
*/
public static List<ACL> parseACLs(String aclString) {
List<ACL> acl = Lists.newArrayList();
if (aclString == null) {
return acl;
}
List<String> aclComps = Lists.newArrayList(
Splitter.on(',').omitEmptyStrings().trimResults()
.split(aclString));
for (String a : aclComps) {
// from ZooKeeperMain private method
int firstColon = a.indexOf(':');
int lastColon = a.lastIndexOf(':');
if (firstColon == -1 || lastColon == -1 || firstColon == lastColon) {
throw new BadAclFormatException(
"ACL '" + a + "' not of expected form scheme:id:perm");
}
ACL newAcl = new ACL();
newAcl.setId(new Id(a.substring(0, firstColon), a.substring(
firstColon + 1, lastColon)));
newAcl.setPerms(getPermFromString(a.substring(lastColon + 1)));
acl.add(newAcl);
}
return acl;
}
/**
* Parse a comma-separated list of authentication mechanisms. Each
* such mechanism should be of the form 'scheme:auth' -- the same
* syntax used for the 'addAuth' command in the ZK CLI.
*
* @param authString the comma-separated auth mechanisms
* @return a list of parsed authentications
*/
public static List<ZKAuthInfo> parseAuth(String authString) {
List<ZKAuthInfo> ret = Lists.newArrayList();
if (authString == null) {
return ret;
}
List<String> authComps = Lists.newArrayList(
Splitter.on(',').omitEmptyStrings().trimResults()
.split(authString));
for (String comp : authComps) {
String parts[] = comp.split(":", 2);
if (parts.length != 2) {
throw new BadAuthFormatException(
"Auth '" + comp + "' not of expected form scheme:auth");
}
ret.add(new ZKAuthInfo(parts[0],
parts[1].getBytes(Charsets.UTF_8)));
}
return ret;
}
/**
* Because ZK ACLs and authentication information may be secret,
* allow the configuration values to be indirected through a file
* by specifying the configuration as "@/path/to/file". If this
* syntax is used, this function will return the contents of the file
* as a String.
*
* @param valInConf the value from the Configuration
* @return either the same value, or the contents of the referenced
* file if the configured value starts with "@"
* @throws IOException if the file cannot be read
*/
public static String resolveConfIndirection(String valInConf)
throws IOException {
if (valInConf == null) return null;
if (!valInConf.startsWith("@")) {
return valInConf;
}
String path = valInConf.substring(1).trim();
return Files.toString(new File(path), Charsets.UTF_8).trim();
}
/**
* An authentication token passed to ZooKeeper.addAuthInfo
*/
static class ZKAuthInfo {
private final String scheme;
private final byte[] auth;
public ZKAuthInfo(String scheme, byte[] auth) {
super();
this.scheme = scheme;
this.auth = auth;
}
String getScheme() {
return scheme;
}
byte[] getAuth() {
return auth;
}
}
static class BadAclFormatException extends HadoopIllegalArgumentException {
private static final long serialVersionUID = 1L;
public BadAclFormatException(String message) {
super(message);
}
}
static class BadAuthFormatException extends HadoopIllegalArgumentException {
private static final long serialVersionUID = 1L;
public BadAuthFormatException(String message) {
super(message);
}
}
}

View File

@ -19,6 +19,7 @@
import java.io.IOException;
import java.security.PrivilegedAction;
import java.util.Collections;
import java.util.List;
import org.apache.commons.logging.Log;
@ -27,11 +28,12 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.ActiveStandbyElector.ActiveStandbyElectorCallback;
import org.apache.hadoop.ha.HAZKUtil.ZKAuthInfo;
import org.apache.hadoop.ha.HealthMonitor.State;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.hadoop.util.ToolRunner;
import org.apache.zookeeper.data.ACL;
import com.google.common.annotations.VisibleForTesting;
@ -47,6 +49,9 @@ public abstract class ZKFailoverController implements Tool {
private static final String ZK_SESSION_TIMEOUT_KEY = "ha.zookeeper.session-timeout.ms";
private static final int ZK_SESSION_TIMEOUT_DEFAULT = 5*1000;
private static final String ZK_PARENT_ZNODE_KEY = "ha.zookeeper.parent-znode";
public static final String ZK_ACL_KEY = "ha.zookeeper.acl";
private static final String ZK_ACL_DEFAULT = "world:anyone:rwcda";
public static final String ZK_AUTH_KEY = "ha.zookeeper.auth";
static final String ZK_PARENT_ZNODE_DEFAULT = "/hadoop-ha";
/** Unable to format the parent znode in ZK */
@ -80,7 +85,7 @@ public void setConf(Configuration conf) {
protected abstract byte[] targetToData(HAServiceTarget target);
protected abstract HAServiceTarget getLocalTarget();
protected abstract HAServiceTarget dataToTarget(byte[] data);
protected abstract void loginAsFCUser() throws IOException;
@Override
public Configuration getConf() {
@ -89,8 +94,7 @@ public Configuration getConf() {
@Override
public int run(final String[] args) throws Exception {
// TODO: need to hook DFS here to find the NN keytab info, etc,
// similar to what DFSHAAdmin does. Annoying that this is in common.
loginAsFCUser();
try {
return SecurityUtil.doAsLoginUserOrFatal(new PrivilegedAction<Integer>() {
@Override
@ -107,6 +111,7 @@ public Integer run() {
}
}
private int doRun(String[] args)
throws HadoopIllegalArgumentException, IOException, InterruptedException {
initZK();
@ -218,9 +223,26 @@ private void initZK() throws HadoopIllegalArgumentException, IOException {
ZK_SESSION_TIMEOUT_DEFAULT);
parentZnode = conf.get(ZK_PARENT_ZNODE_KEY,
ZK_PARENT_ZNODE_DEFAULT);
// TODO: need ZK ACL support in config, also maybe auth!
List<ACL> zkAcls = Ids.OPEN_ACL_UNSAFE;
// Parse ACLs from configuration.
String zkAclConf = conf.get(ZK_ACL_KEY, ZK_ACL_DEFAULT);
zkAclConf = HAZKUtil.resolveConfIndirection(zkAclConf);
List<ACL> zkAcls = HAZKUtil.parseACLs(zkAclConf);
if (zkAcls.isEmpty()) {
zkAcls = Ids.CREATOR_ALL_ACL;
}
// Parse authentication from configuration.
String zkAuthConf = conf.get(ZK_AUTH_KEY);
zkAuthConf = HAZKUtil.resolveConfIndirection(zkAuthConf);
List<ZKAuthInfo> zkAuths;
if (zkAuthConf != null) {
zkAuths = HAZKUtil.parseAuth(zkAuthConf);
} else {
zkAuths = Collections.emptyList();
}
// Sanity check configuration.
Preconditions.checkArgument(zkQuorum != null,
"Missing required configuration '%s' for ZooKeeper quorum",
ZK_QUORUM_KEY);
@ -229,7 +251,8 @@ private void initZK() throws HadoopIllegalArgumentException, IOException {
elector = new ActiveStandbyElector(zkQuorum,
zkTimeout, parentZnode, zkAcls, new ElectorCallbacks());
zkTimeout, parentZnode, zkAcls, zkAuths,
new ElectorCallbacks());
}
private synchronized void mainLoop() throws InterruptedException {

View File

@ -20,6 +20,7 @@
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.commons.logging.Log;
@ -275,5 +276,9 @@ protected HAServiceTarget dataToTarget(byte[] data) {
protected HAServiceTarget getLocalTarget() {
return localTarget;
}
@Override
protected void loginAsFCUser() throws IOException {
}
}
}

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.ha;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import org.apache.zookeeper.AsyncCallback;
@ -40,6 +41,7 @@
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.ha.ActiveStandbyElector.ActiveStandbyElectorCallback;
import org.apache.hadoop.ha.ActiveStandbyElector.ActiveNotFoundException;
import org.apache.hadoop.ha.HAZKUtil.ZKAuthInfo;
public class TestActiveStandbyElector {
@ -55,7 +57,8 @@ class ActiveStandbyElectorTester extends ActiveStandbyElector {
ActiveStandbyElectorTester(String hostPort, int timeout, String parent,
List<ACL> acl, ActiveStandbyElectorCallback app) throws IOException {
super(hostPort, timeout, parent, acl, app);
super(hostPort, timeout, parent, acl,
Collections.<ZKAuthInfo>emptyList(), app);
}
@Override

View File

@ -22,11 +22,13 @@
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.util.Collections;
import java.util.UUID;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.ha.ActiveStandbyElector.ActiveStandbyElectorCallback;
import org.apache.hadoop.ha.ActiveStandbyElector.State;
import org.apache.hadoop.ha.HAZKUtil.ZKAuthInfo;
import org.apache.log4j.Level;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.server.ZooKeeperServer;
@ -69,7 +71,8 @@ public void setUp() throws Exception {
cbs[i] = Mockito.mock(ActiveStandbyElectorCallback.class);
appDatas[i] = Ints.toByteArray(i);
electors[i] = new ActiveStandbyElector(
hostPort, 5000, PARENT_DIR, Ids.OPEN_ACL_UNSAFE, cbs[i]);
hostPort, 5000, PARENT_DIR, Ids.OPEN_ACL_UNSAFE,
Collections.<ZKAuthInfo>emptyList(), cbs[i]);
}
}

View File

@ -31,7 +31,6 @@
import org.apache.hadoop.ha.TestNodeFencer.AlwaysFailFencer;
import static org.apache.hadoop.ha.TestNodeFencer.setupFencer;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.test.MockitoUtil;
import org.junit.Test;
import org.mockito.Mockito;

View File

@ -0,0 +1,135 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ha;
import static org.junit.Assert.*;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.ha.HAZKUtil.BadAclFormatException;
import org.apache.hadoop.ha.HAZKUtil.ZKAuthInfo;
import org.apache.zookeeper.ZooDefs.Perms;
import org.apache.zookeeper.data.ACL;
import org.junit.Test;
import com.google.common.base.Charsets;
import com.google.common.io.Files;
public class TestHAZKUtil {
private static final String TEST_ROOT_DIR = System.getProperty(
"test.build.data", "/tmp") + "/TestHAZKUtil";
private static final File TEST_FILE = new File(TEST_ROOT_DIR,
"test-file");
/** A path which is expected not to exist */
private static final String BOGUS_FILE = "/xxxx-this-does-not-exist";
@Test
public void testEmptyACL() {
List<ACL> result = HAZKUtil.parseACLs("");
assertTrue(result.isEmpty());
}
@Test
public void testNullACL() {
List<ACL> result = HAZKUtil.parseACLs(null);
assertTrue(result.isEmpty());
}
@Test
public void testInvalidACLs() {
badAcl("a:b",
"ACL 'a:b' not of expected form scheme:id:perm"); // not enough parts
badAcl("a",
"ACL 'a' not of expected form scheme:id:perm"); // not enough parts
badAcl("password:foo:rx",
"Invalid permission 'x' in permission string 'rx'");
}
private static void badAcl(String acls, String expectedErr) {
try {
HAZKUtil.parseACLs(acls);
fail("Should have failed to parse '" + acls + "'");
} catch (BadAclFormatException e) {
assertEquals(expectedErr, e.getMessage());
}
}
@Test
public void testGoodACLs() {
List<ACL> result = HAZKUtil.parseACLs(
"sasl:hdfs/host1@MY.DOMAIN:cdrwa, sasl:hdfs/host2@MY.DOMAIN:ca");
ACL acl0 = result.get(0);
assertEquals(Perms.CREATE | Perms.DELETE | Perms.READ |
Perms.WRITE | Perms.ADMIN, acl0.getPerms());
assertEquals("sasl", acl0.getId().getScheme());
assertEquals("hdfs/host1@MY.DOMAIN", acl0.getId().getId());
ACL acl1 = result.get(1);
assertEquals(Perms.CREATE | Perms.ADMIN, acl1.getPerms());
assertEquals("sasl", acl1.getId().getScheme());
assertEquals("hdfs/host2@MY.DOMAIN", acl1.getId().getId());
}
@Test
public void testEmptyAuth() {
List<ZKAuthInfo> result = HAZKUtil.parseAuth("");
assertTrue(result.isEmpty());
}
@Test
public void testNullAuth() {
List<ZKAuthInfo> result = HAZKUtil.parseAuth(null);
assertTrue(result.isEmpty());
}
@Test
public void testGoodAuths() {
List<ZKAuthInfo> result = HAZKUtil.parseAuth(
"scheme:data,\n scheme2:user:pass");
assertEquals(2, result.size());
ZKAuthInfo auth0 = result.get(0);
assertEquals("scheme", auth0.getScheme());
assertEquals("data", new String(auth0.getAuth()));
ZKAuthInfo auth1 = result.get(1);
assertEquals("scheme2", auth1.getScheme());
assertEquals("user:pass", new String(auth1.getAuth()));
}
@Test
public void testConfIndirection() throws IOException {
assertNull(HAZKUtil.resolveConfIndirection(null));
assertEquals("x", HAZKUtil.resolveConfIndirection("x"));
TEST_FILE.getParentFile().mkdirs();
Files.write("hello world", TEST_FILE, Charsets.UTF_8);
assertEquals("hello world", HAZKUtil.resolveConfIndirection(
"@" + TEST_FILE.getAbsolutePath()));
try {
HAZKUtil.resolveConfIndirection("@" + BOGUS_FILE);
fail("Did not throw for non-existent file reference");
} catch (FileNotFoundException fnfe) {
assertTrue(fnfe.getMessage().startsWith(BOGUS_FILE));
}
}
}

View File

@ -17,9 +17,10 @@
*/
package org.apache.hadoop.ha;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.*;
import java.io.File;
import java.security.NoSuchAlgorithmException;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
@ -27,6 +28,10 @@
import org.apache.hadoop.ha.HealthMonitor.State;
import org.apache.hadoop.ha.MiniZKFCCluster.DummyZKFC;
import org.apache.log4j.Level;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
import org.apache.zookeeper.test.ClientBase;
import org.junit.Before;
import org.junit.Test;
@ -36,6 +41,24 @@ public class TestZKFailoverController extends ClientBase {
private Configuration conf;
private MiniZKFCCluster cluster;
// Set up ZK digest-based credentials for the purposes of the tests,
// to make sure all of our functionality works with auth and ACLs
// present.
private static final String DIGEST_USER_PASS="test-user:test-password";
private static final String TEST_AUTH_GOOD =
"digest:" + DIGEST_USER_PASS;
private static final String DIGEST_USER_HASH;
static {
try {
DIGEST_USER_HASH = DigestAuthenticationProvider.generateDigest(
DIGEST_USER_PASS);
} catch (NoSuchAlgorithmException e) {
throw new RuntimeException(e);
}
}
private static final String TEST_ACL =
"digest:" + DIGEST_USER_HASH + ":rwcda";
static {
((Log4JLogger)ActiveStandbyElector.LOG).getLogger().setLevel(Level.ALL);
}
@ -50,6 +73,9 @@ public void setUp() throws Exception {
@Before
public void setupConfAndServices() {
conf = new Configuration();
conf.set(ZKFailoverController.ZK_ACL_KEY, TEST_ACL);
conf.set(ZKFailoverController.ZK_AUTH_KEY, TEST_AUTH_GOOD);
conf.set(ZKFailoverController.ZK_QUORUM_KEY, hostPort);
this.cluster = new MiniZKFCCluster(conf, getServer(serverFactory));
}
@ -77,6 +103,28 @@ public void testFormatZK() throws Exception {
assertEquals(0, runFC(svc, "-formatZK", "-force"));
}
/**
* Test that, if ACLs are specified in the configuration, that
* it sets the ACLs when formatting the parent node.
*/
@Test(timeout=15000)
public void testFormatSetsAcls() throws Exception {
// Format the base dir, should succeed
DummyHAService svc = cluster.getService(1);
assertEquals(0, runFC(svc, "-formatZK"));
ZooKeeper otherClient = createClient();
try {
// client without auth should not be able to read it
Stat stat = new Stat();
otherClient.getData(ZKFailoverController.ZK_PARENT_ZNODE_DEFAULT,
false, stat);
fail("Was able to read data without authenticating!");
} catch (KeeperException.NoAuthException nae) {
// expected
}
}
/**
* Test that the ZKFC won't run if fencing is not configured for the
* local service.

View File

@ -46,6 +46,18 @@ protected void setErrOut(PrintStream errOut) {
@Override
public void setConf(Configuration conf) {
if (conf != null) {
conf = addSecurityConfiguration(conf);
}
super.setConf(conf);
}
/**
* Add the requisite security principal settings to the given Configuration,
* returning a copy.
* @param conf the original config
* @return a copy with the security settings added
*/
public static Configuration addSecurityConfiguration(Configuration conf) {
// Make a copy so we don't mutate it. Also use an HdfsConfiguration to
// force loading of hdfs-site.xml.
conf = new HdfsConfiguration(conf);
@ -57,8 +69,7 @@ public void setConf(Configuration conf) {
conf.set(CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_USER_NAME_KEY,
nameNodePrincipal);
}
super.setConf(conf);
return conf;
}
/**

View File

@ -17,6 +17,10 @@
*/
package org.apache.hadoop.hdfs.tools;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.commons.logging.Log;
@ -28,9 +32,9 @@
import org.apache.hadoop.ha.ZKFailoverController;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.ha.proto.HAZKInfoProtos.ActiveNodeInfo;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ToolRunner;
@ -81,9 +85,7 @@ protected byte[] targetToData(HAServiceTarget target) {
@Override
public void setConf(Configuration conf) {
// Use HdfsConfiguration here to force hdfs-site.xml to load
localNNConf = new HdfsConfiguration(conf);
localNNConf = DFSHAAdmin.addSecurityConfiguration(conf);
String nsId = DFSUtil.getNamenodeNameServiceId(conf);
if (!HAUtil.isHAEnabled(localNNConf, nsId)) {
@ -107,6 +109,13 @@ public HAServiceTarget getLocalTarget() {
return localTarget;
}
@Override
public void loginAsFCUser() throws IOException {
InetSocketAddress socAddr = NameNode.getAddress(localNNConf);
SecurityUtil.login(getConf(), DFS_NAMENODE_KEYTAB_FILE_KEY,
DFS_NAMENODE_USER_NAME_KEY, socAddr.getHostName());
}
public static void main(String args[])
throws Exception {
System.exit(ToolRunner.run(