MAPREDUCE-5265. History server admin service to refresh user and superuser group mappings. Contributed by Ashwin Shankar
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1504645 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
32bc200d54
commit
cc536fe4da
@ -222,3 +222,13 @@ log4j.appender.RMSUMMARY.MaxFileSize=256MB
|
||||
log4j.appender.RMSUMMARY.MaxBackupIndex=20
|
||||
log4j.appender.RMSUMMARY.layout=org.apache.log4j.PatternLayout
|
||||
log4j.appender.RMSUMMARY.layout.ConversionPattern=%d{ISO8601} %p %c{2}: %m%n
|
||||
|
||||
# HS audit log configs
|
||||
#mapreduce.hs.audit.logger=INFO,HSAUDIT
|
||||
#log4j.logger.org.apache.hadoop.mapreduce.v2.hs.HSAuditLogger=${mapreduce.hs.audit.logger}
|
||||
#log4j.additivity.org.apache.hadoop.mapreduce.v2.hs.HSAuditLogger=false
|
||||
#log4j.appender.HSAUDIT=org.apache.log4j.DailyRollingFileAppender
|
||||
#log4j.appender.HSAUDIT.File=${hadoop.log.dir}/hs-audit.log
|
||||
#log4j.appender.HSAUDIT.layout=org.apache.log4j.PatternLayout
|
||||
#log4j.appender.HSAUDIT.layout.ConversionPattern=%d{ISO8601} %p %c{2}: %m%n
|
||||
#log4j.appender.HSAUDIT.DatePattern=.yyyy-MM-dd
|
@ -143,6 +143,9 @@ Release 2.3.0 - UNRELEASED
|
||||
|
||||
NEW FEATURES
|
||||
|
||||
MAPREDUCE-5265. History server admin service to refresh user and superuser
|
||||
group mappings (Ashwin Shankar via jlowe)
|
||||
|
||||
IMPROVEMENTS
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
@ -38,6 +38,7 @@ function print_usage(){
|
||||
echo " historyserver run job history servers as a standalone daemon"
|
||||
echo " distcp <srcurl> <desturl> copy file or directories recursively"
|
||||
echo " archive -archiveName NAME -p <parent path> <src>* <dest> create a hadoop archive"
|
||||
echo " hsadmin job history server admin interface"
|
||||
echo ""
|
||||
echo "Most commands print help when invoked w/o parameters."
|
||||
}
|
||||
@ -92,6 +93,9 @@ elif [ "$COMMAND" = "archive" ] ; then
|
||||
CLASS=org.apache.hadoop.tools.HadoopArchives
|
||||
CLASSPATH=${CLASSPATH}:${TOOL_PATH}
|
||||
HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"
|
||||
elif [ "$COMMAND" = "hsadmin" ] ; then
|
||||
CLASS=org.apache.hadoop.mapreduce.v2.hs.client.HSAdmin
|
||||
HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"
|
||||
else
|
||||
echo $COMMAND - invalid command
|
||||
print_usage
|
||||
|
@ -457,6 +457,9 @@
|
||||
<Match>
|
||||
<Package name="org.apache.hadoop.yarn.proto" />
|
||||
</Match>
|
||||
<Match>
|
||||
<Package name="org.apache.hadoop.mapreduce.v2.hs.proto" />
|
||||
</Match>
|
||||
<Match>
|
||||
<Class name="~org\.apache\.hadoop\.mapreduce\.v2\.proto.*" />
|
||||
</Match>
|
||||
|
@ -37,6 +37,17 @@ public class JHAdminConfig {
|
||||
public static final String DEFAULT_MR_HISTORY_ADDRESS = "0.0.0.0:" +
|
||||
DEFAULT_MR_HISTORY_PORT;
|
||||
|
||||
/** The address of the History server admin interface. */
|
||||
public static final String JHS_ADMIN_ADDRESS = MR_HISTORY_PREFIX
|
||||
+ "admin.address";
|
||||
public static final int DEFAULT_JHS_ADMIN_PORT = 10033;
|
||||
public static final String DEFAULT_JHS_ADMIN_ADDRESS = "0.0.0.0:"
|
||||
+ DEFAULT_JHS_ADMIN_PORT;
|
||||
|
||||
/** ACL of who can be admin of Job history server. */
|
||||
public static final String JHS_ADMIN_ACL = MR_HISTORY_PREFIX + "admin.acl";
|
||||
public static final String DEFAULT_JHS_ADMIN_ACL = "*";
|
||||
|
||||
/** If history cleaning should be enabled or not.*/
|
||||
public static final String MR_HISTORY_CLEANER_ENABLE =
|
||||
MR_HISTORY_PREFIX + "cleaner.enable";
|
||||
|
@ -1138,4 +1138,16 @@
|
||||
<description>Whether to use fixed ports with the minicluster</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>mapreduce.jobhistory.admin.address</name>
|
||||
<value>0.0.0.0:10033</value>
|
||||
<description>The address of the History server admin interface.</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>mapreduce.jobhistory.admin.acl</name>
|
||||
<value>*</value>
|
||||
<description>ACL of who can be admin of the History server.</description>
|
||||
</property>
|
||||
|
||||
</configuration>
|
||||
|
@ -67,6 +67,34 @@
|
||||
</excludes>
|
||||
</configuration>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-maven-plugins</artifactId>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>compile-protoc</id>
|
||||
<phase>generate-sources</phase>
|
||||
<goals>
|
||||
<goal>protoc</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<imports>
|
||||
<param>
|
||||
${basedir}/../../../../hadoop-common-project/hadoop-common/src/main/proto
|
||||
</param>
|
||||
<param>${basedir}/src/main/proto</param>
|
||||
</imports>
|
||||
<source>
|
||||
<directory>${basedir}/src/main/proto</directory>
|
||||
<includes>
|
||||
<include>HSAdminRefreshProtocol.proto</include>
|
||||
</includes>
|
||||
</source>
|
||||
<output>${project.build.directory}/generated-sources/java</output>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
|
@ -0,0 +1,152 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.mapreduce.v2.hs;
|
||||
|
||||
import java.net.InetAddress;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.ipc.Server;
|
||||
|
||||
@Private
|
||||
public class HSAuditLogger {
|
||||
private static final Log LOG = LogFactory.getLog(HSAuditLogger.class);
|
||||
|
||||
static enum Keys {
|
||||
USER, OPERATION, TARGET, RESULT, IP, PERMISSIONS, DESCRIPTION
|
||||
}
|
||||
|
||||
public static class AuditConstants {
|
||||
static final String SUCCESS = "SUCCESS";
|
||||
static final String FAILURE = "FAILURE";
|
||||
static final String KEY_VAL_SEPARATOR = "=";
|
||||
static final char PAIR_SEPARATOR = '\t';
|
||||
|
||||
// Some commonly used descriptions
|
||||
public static final String UNAUTHORIZED_USER = "Unauthorized user";
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a readable and parseable audit log string for a successful event.
|
||||
*
|
||||
* @param user
|
||||
* User who made the service request.
|
||||
* @param operation
|
||||
* Operation requested by the user.
|
||||
* @param target
|
||||
* The target on which the operation is being performed.
|
||||
*
|
||||
* <br>
|
||||
* <br>
|
||||
* Note that the {@link HSAuditLogger} uses tabs ('\t') as a key-val
|
||||
* delimiter and hence the value fields should not contains tabs
|
||||
* ('\t').
|
||||
*/
|
||||
public static void logSuccess(String user, String operation, String target) {
|
||||
if (LOG.isInfoEnabled()) {
|
||||
LOG.info(createSuccessLog(user, operation, target));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A helper api for creating an audit log for a successful event.
|
||||
*/
|
||||
static String createSuccessLog(String user, String operation, String target) {
|
||||
StringBuilder b = new StringBuilder();
|
||||
start(Keys.USER, user, b);
|
||||
addRemoteIP(b);
|
||||
add(Keys.OPERATION, operation, b);
|
||||
add(Keys.TARGET, target, b);
|
||||
add(Keys.RESULT, AuditConstants.SUCCESS, b);
|
||||
return b.toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* A helper api to add remote IP address
|
||||
*/
|
||||
static void addRemoteIP(StringBuilder b) {
|
||||
InetAddress ip = Server.getRemoteIp();
|
||||
// ip address can be null for testcases
|
||||
if (ip != null) {
|
||||
add(Keys.IP, ip.getHostAddress(), b);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Appends the key-val pair to the passed builder in the following format
|
||||
* <pair-delim>key=value
|
||||
*/
|
||||
static void add(Keys key, String value, StringBuilder b) {
|
||||
b.append(AuditConstants.PAIR_SEPARATOR).append(key.name())
|
||||
.append(AuditConstants.KEY_VAL_SEPARATOR).append(value);
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds the first key-val pair to the passed builder in the following format
|
||||
* key=value
|
||||
*/
|
||||
static void start(Keys key, String value, StringBuilder b) {
|
||||
b.append(key.name()).append(AuditConstants.KEY_VAL_SEPARATOR).append(value);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a readable and parseable audit log string for a failed event.
|
||||
*
|
||||
* @param user
|
||||
* User who made the service request.
|
||||
* @param operation
|
||||
* Operation requested by the user.
|
||||
* @param perm
|
||||
* Target permissions.
|
||||
* @param target
|
||||
* The target on which the operation is being performed.
|
||||
* @param description
|
||||
* Some additional information as to why the operation failed.
|
||||
*
|
||||
* <br>
|
||||
* <br>
|
||||
* Note that the {@link HSAuditLogger} uses tabs ('\t') as a key-val
|
||||
* delimiter and hence the value fields should not contains tabs
|
||||
* ('\t').
|
||||
*/
|
||||
public static void logFailure(String user, String operation, String perm,
|
||||
String target, String description) {
|
||||
if (LOG.isWarnEnabled()) {
|
||||
LOG.warn(createFailureLog(user, operation, perm, target, description));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A helper api for creating an audit log for a failure event.
|
||||
*/
|
||||
static String createFailureLog(String user, String operation, String perm,
|
||||
String target, String description) {
|
||||
StringBuilder b = new StringBuilder();
|
||||
start(Keys.USER, user, b);
|
||||
addRemoteIP(b);
|
||||
add(Keys.OPERATION, operation, b);
|
||||
add(Keys.TARGET, target, b);
|
||||
add(Keys.RESULT, AuditConstants.FAILURE, b);
|
||||
add(Keys.DESCRIPTION, description, b);
|
||||
add(Keys.PERMISSIONS, perm, b);
|
||||
|
||||
return b.toString();
|
||||
}
|
||||
}
|
@ -0,0 +1,101 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.mapreduce.v2.hs;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.mapreduce.v2.hs.protocol.HSAdminRefreshProtocol;
|
||||
import org.apache.hadoop.mapreduce.v2.hs.protocolPB.HSAdminRefreshProtocolClientSideTranslatorPB;
|
||||
import org.apache.hadoop.mapreduce.v2.hs.protocolPB.HSAdminRefreshProtocolPB;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.security.RefreshUserMappingsProtocol;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolClientSideTranslatorPB;
|
||||
import org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolPB;
|
||||
import org.apache.hadoop.tools.GetUserMappingsProtocol;
|
||||
import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolClientSideTranslatorPB;
|
||||
import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolPB;
|
||||
|
||||
@Private
|
||||
public class HSProxies {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(HSProxies.class);
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public static <T> T createProxy(Configuration conf, InetSocketAddress hsaddr,
|
||||
Class<T> xface, UserGroupInformation ugi) throws IOException {
|
||||
|
||||
T proxy;
|
||||
if (xface == RefreshUserMappingsProtocol.class) {
|
||||
proxy = (T) createHSProxyWithRefreshUserMappingsProtocol(hsaddr, conf,
|
||||
ugi);
|
||||
} else if (xface == GetUserMappingsProtocol.class) {
|
||||
proxy = (T) createHSProxyWithGetUserMappingsProtocol(hsaddr, conf, ugi);
|
||||
} else if (xface == HSAdminRefreshProtocol.class) {
|
||||
proxy = (T) createHSProxyWithHSAdminRefreshProtocol(hsaddr, conf, ugi);
|
||||
} else {
|
||||
String message = "Unsupported protocol found when creating the proxy "
|
||||
+ "connection to History server: "
|
||||
+ ((xface != null) ? xface.getClass().getName() : "null");
|
||||
LOG.error(message);
|
||||
throw new IllegalStateException(message);
|
||||
}
|
||||
return proxy;
|
||||
}
|
||||
|
||||
private static RefreshUserMappingsProtocol createHSProxyWithRefreshUserMappingsProtocol(
|
||||
InetSocketAddress address, Configuration conf, UserGroupInformation ugi)
|
||||
throws IOException {
|
||||
RefreshUserMappingsProtocolPB proxy = (RefreshUserMappingsProtocolPB) createHSProxy(
|
||||
address, conf, ugi, RefreshUserMappingsProtocolPB.class, 0);
|
||||
return new RefreshUserMappingsProtocolClientSideTranslatorPB(proxy);
|
||||
}
|
||||
|
||||
private static GetUserMappingsProtocol createHSProxyWithGetUserMappingsProtocol(
|
||||
InetSocketAddress address, Configuration conf, UserGroupInformation ugi)
|
||||
throws IOException {
|
||||
GetUserMappingsProtocolPB proxy = (GetUserMappingsProtocolPB) createHSProxy(
|
||||
address, conf, ugi, GetUserMappingsProtocolPB.class, 0);
|
||||
return new GetUserMappingsProtocolClientSideTranslatorPB(proxy);
|
||||
}
|
||||
|
||||
private static HSAdminRefreshProtocol createHSProxyWithHSAdminRefreshProtocol(
|
||||
InetSocketAddress hsaddr, Configuration conf, UserGroupInformation ugi)
|
||||
throws IOException {
|
||||
HSAdminRefreshProtocolPB proxy = (HSAdminRefreshProtocolPB) createHSProxy(
|
||||
hsaddr, conf, ugi, HSAdminRefreshProtocolPB.class, 0);
|
||||
return new HSAdminRefreshProtocolClientSideTranslatorPB(proxy);
|
||||
}
|
||||
|
||||
private static Object createHSProxy(InetSocketAddress address,
|
||||
Configuration conf, UserGroupInformation ugi, Class<?> xface,
|
||||
int rpcTimeout) throws IOException {
|
||||
RPC.setProtocolEngine(conf, xface, ProtobufRpcEngine.class);
|
||||
Object proxy = RPC.getProxy(xface, RPC.getProtocolVersion(xface), address,
|
||||
ugi, conf, NetUtils.getDefaultSocketFactory(conf), rpcTimeout);
|
||||
return proxy;
|
||||
}
|
||||
}
|
@ -26,6 +26,7 @@
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
import org.apache.hadoop.mapreduce.MRConfig;
|
||||
import org.apache.hadoop.mapreduce.v2.hs.server.HSAdminServer;
|
||||
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
|
||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||
import org.apache.hadoop.metrics2.source.JvmMetrics;
|
||||
@ -59,6 +60,7 @@ public class JobHistoryServer extends CompositeService {
|
||||
private JobHistory jobHistoryService;
|
||||
private JHSDelegationTokenSecretManager jhsDTSecretManager;
|
||||
private AggregatedLogDeletionService aggLogDelService;
|
||||
private HSAdminServer hsAdminServer;
|
||||
|
||||
public JobHistoryServer() {
|
||||
super(JobHistoryServer.class.getName());
|
||||
@ -81,9 +83,11 @@ protected void serviceInit(Configuration conf) throws Exception {
|
||||
clientService = new HistoryClientService(historyContext,
|
||||
this.jhsDTSecretManager);
|
||||
aggLogDelService = new AggregatedLogDeletionService();
|
||||
hsAdminServer = new HSAdminServer();
|
||||
addService(jobHistoryService);
|
||||
addService(clientService);
|
||||
addService(aggLogDelService);
|
||||
addService(hsAdminServer);
|
||||
super.serviceInit(config);
|
||||
}
|
||||
|
||||
|
@ -0,0 +1,249 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.mapreduce.v2.hs.client;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.Arrays;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.conf.Configured;
|
||||
import org.apache.hadoop.mapreduce.v2.hs.HSProxies;
|
||||
import org.apache.hadoop.mapreduce.v2.hs.protocol.HSAdminRefreshProtocol;
|
||||
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
|
||||
import org.apache.hadoop.security.RefreshUserMappingsProtocol;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.tools.GetUserMappingsProtocol;
|
||||
import org.apache.hadoop.util.Tool;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
|
||||
@Private
|
||||
public class HSAdmin extends Configured implements Tool {
|
||||
|
||||
public HSAdmin() {
|
||||
super();
|
||||
}
|
||||
|
||||
public HSAdmin(Configuration conf) {
|
||||
super(conf);
|
||||
}
|
||||
|
||||
/**
|
||||
* Displays format of commands.
|
||||
*
|
||||
* @param cmd
|
||||
* The command that is being executed.
|
||||
*/
|
||||
private static void printUsage(String cmd) {
|
||||
if ("-refreshUserToGroupsMappings".equals(cmd)) {
|
||||
System.err
|
||||
.println("Usage: mapred hsadmin [-refreshUserToGroupsMappings]");
|
||||
} else if ("-refreshSuperUserGroupsConfiguration".equals(cmd)) {
|
||||
System.err
|
||||
.println("Usage: mapred hsadmin [-refreshSuperUserGroupsConfiguration]");
|
||||
} else if ("-refreshAdminAcls".equals(cmd)) {
|
||||
System.err.println("Usage: mapred hsadmin [-refreshAdminAcls]");
|
||||
} else if ("-getGroups".equals(cmd)) {
|
||||
System.err.println("Usage: mapred hsadmin" + " [-getGroups [username]]");
|
||||
} else {
|
||||
System.err.println("Usage: mapred hsadmin");
|
||||
System.err.println(" [-refreshUserToGroupsMappings]");
|
||||
System.err.println(" [-refreshSuperUserGroupsConfiguration]");
|
||||
System.err.println(" [-refreshAdminAcls]");
|
||||
System.err.println(" [-getGroups [username]]");
|
||||
System.err.println(" [-help [cmd]]");
|
||||
System.err.println();
|
||||
ToolRunner.printGenericCommandUsage(System.err);
|
||||
}
|
||||
}
|
||||
|
||||
private static void printHelp(String cmd) {
|
||||
String summary = "hsadmin is the command to execute Job History server administrative commands.\n"
|
||||
+ "The full syntax is: \n\n"
|
||||
+ "mapred hsadmin"
|
||||
+ " [-refreshUserToGroupsMappings]"
|
||||
+ " [-refreshSuperUserGroupsConfiguration]"
|
||||
+ " [-refreshAdminAcls]"
|
||||
+ " [-getGroups [username]]" + " [-help [cmd]]\n";
|
||||
|
||||
String refreshUserToGroupsMappings = "-refreshUserToGroupsMappings: Refresh user-to-groups mappings\n";
|
||||
|
||||
String refreshSuperUserGroupsConfiguration = "-refreshSuperUserGroupsConfiguration: Refresh superuser proxy groups mappings\n";
|
||||
|
||||
String refreshAdminAcls = "-refreshAdminAcls: Refresh acls for administration of Job history server\n";
|
||||
|
||||
String getGroups = "-getGroups [username]: Get the groups which given user belongs to\n";
|
||||
|
||||
String help = "-help [cmd]: \tDisplays help for the given command or all commands if none\n"
|
||||
+ "\t\tis specified.\n";
|
||||
|
||||
if ("refreshUserToGroupsMappings".equals(cmd)) {
|
||||
System.out.println(refreshUserToGroupsMappings);
|
||||
} else if ("help".equals(cmd)) {
|
||||
System.out.println(help);
|
||||
} else if ("refreshSuperUserGroupsConfiguration".equals(cmd)) {
|
||||
System.out.println(refreshSuperUserGroupsConfiguration);
|
||||
} else if ("refreshAdminAcls".equals(cmd)) {
|
||||
System.out.println(refreshAdminAcls);
|
||||
} else if ("getGroups".equals(cmd)) {
|
||||
System.out.println(getGroups);
|
||||
} else {
|
||||
System.out.println(summary);
|
||||
System.out.println(refreshUserToGroupsMappings);
|
||||
System.out.println(refreshSuperUserGroupsConfiguration);
|
||||
System.out.println(refreshAdminAcls);
|
||||
System.out.println(getGroups);
|
||||
System.out.println(help);
|
||||
System.out.println();
|
||||
ToolRunner.printGenericCommandUsage(System.out);
|
||||
}
|
||||
}
|
||||
|
||||
private int getGroups(String[] usernames) throws IOException {
|
||||
// Get groups users belongs to
|
||||
if (usernames.length == 0) {
|
||||
usernames = new String[] { UserGroupInformation.getCurrentUser()
|
||||
.getUserName() };
|
||||
}
|
||||
|
||||
// Get the current configuration
|
||||
Configuration conf = getConf();
|
||||
|
||||
InetSocketAddress address = conf.getSocketAddr(
|
||||
JHAdminConfig.JHS_ADMIN_ADDRESS,
|
||||
JHAdminConfig.DEFAULT_JHS_ADMIN_ADDRESS,
|
||||
JHAdminConfig.DEFAULT_JHS_ADMIN_PORT);
|
||||
|
||||
GetUserMappingsProtocol getUserMappingProtocol = HSProxies.createProxy(
|
||||
conf, address, GetUserMappingsProtocol.class,
|
||||
UserGroupInformation.getCurrentUser());
|
||||
for (String username : usernames) {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append(username + " :");
|
||||
for (String group : getUserMappingProtocol.getGroupsForUser(username)) {
|
||||
sb.append(" ");
|
||||
sb.append(group);
|
||||
}
|
||||
System.out.println(sb);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
private int refreshUserToGroupsMappings() throws IOException {
|
||||
// Get the current configuration
|
||||
Configuration conf = getConf();
|
||||
|
||||
InetSocketAddress address = conf.getSocketAddr(
|
||||
JHAdminConfig.JHS_ADMIN_ADDRESS,
|
||||
JHAdminConfig.DEFAULT_JHS_ADMIN_ADDRESS,
|
||||
JHAdminConfig.DEFAULT_JHS_ADMIN_PORT);
|
||||
|
||||
RefreshUserMappingsProtocol refreshProtocol = HSProxies.createProxy(conf,
|
||||
address, RefreshUserMappingsProtocol.class,
|
||||
UserGroupInformation.getCurrentUser());
|
||||
// Refresh the user-to-groups mappings
|
||||
refreshProtocol.refreshUserToGroupsMappings();
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
private int refreshSuperUserGroupsConfiguration() throws IOException {
|
||||
// Refresh the super-user groups
|
||||
Configuration conf = getConf();
|
||||
InetSocketAddress address = conf.getSocketAddr(
|
||||
JHAdminConfig.JHS_ADMIN_ADDRESS,
|
||||
JHAdminConfig.DEFAULT_JHS_ADMIN_ADDRESS,
|
||||
JHAdminConfig.DEFAULT_JHS_ADMIN_PORT);
|
||||
|
||||
RefreshUserMappingsProtocol refreshProtocol = HSProxies.createProxy(conf,
|
||||
address, RefreshUserMappingsProtocol.class,
|
||||
UserGroupInformation.getCurrentUser());
|
||||
// Refresh the super-user group mappings
|
||||
refreshProtocol.refreshSuperUserGroupsConfiguration();
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
private int refreshAdminAcls() throws IOException {
|
||||
// Refresh the admin acls
|
||||
Configuration conf = getConf();
|
||||
InetSocketAddress address = conf.getSocketAddr(
|
||||
JHAdminConfig.JHS_ADMIN_ADDRESS,
|
||||
JHAdminConfig.DEFAULT_JHS_ADMIN_ADDRESS,
|
||||
JHAdminConfig.DEFAULT_JHS_ADMIN_PORT);
|
||||
|
||||
HSAdminRefreshProtocol refreshProtocol = HSProxies.createProxy(conf,
|
||||
address, HSAdminRefreshProtocol.class,
|
||||
UserGroupInformation.getCurrentUser());
|
||||
// Refresh the user-to-groups mappings
|
||||
refreshProtocol.refreshAdminAcls();
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int run(String[] args) throws Exception {
|
||||
if (args.length < 1) {
|
||||
printUsage("");
|
||||
return -1;
|
||||
}
|
||||
|
||||
int exitCode = -1;
|
||||
int i = 0;
|
||||
String cmd = args[i++];
|
||||
|
||||
if ("-refreshUserToGroupsMappings".equals(cmd)
|
||||
|| "-refreshSuperUserGroupsConfiguration".equals(cmd)
|
||||
|| "-refreshAdminAcls".equals(cmd)) {
|
||||
if (args.length != 1) {
|
||||
printUsage(cmd);
|
||||
return exitCode;
|
||||
}
|
||||
}
|
||||
|
||||
exitCode = 0;
|
||||
if ("-refreshUserToGroupsMappings".equals(cmd)) {
|
||||
exitCode = refreshUserToGroupsMappings();
|
||||
} else if ("-refreshSuperUserGroupsConfiguration".equals(cmd)) {
|
||||
exitCode = refreshSuperUserGroupsConfiguration();
|
||||
} else if ("-refreshAdminAcls".equals(cmd)) {
|
||||
exitCode = refreshAdminAcls();
|
||||
} else if ("-getGroups".equals(cmd)) {
|
||||
String[] usernames = Arrays.copyOfRange(args, i, args.length);
|
||||
exitCode = getGroups(usernames);
|
||||
} else if ("-help".equals(cmd)) {
|
||||
if (i < args.length) {
|
||||
printHelp(args[i]);
|
||||
} else {
|
||||
printHelp("");
|
||||
}
|
||||
} else {
|
||||
exitCode = -1;
|
||||
System.err.println(cmd.substring(1) + ": Unknown command");
|
||||
printUsage("");
|
||||
}
|
||||
return exitCode;
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
int result = ToolRunner.run(new HSAdmin(), args);
|
||||
System.exit(result);
|
||||
}
|
||||
}
|
@ -0,0 +1,38 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.mapreduce.v2.hs.protocol;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
import org.apache.hadoop.security.KerberosInfo;
|
||||
import org.apache.hadoop.security.RefreshUserMappingsProtocol;
|
||||
import org.apache.hadoop.tools.GetUserMappingsProtocol;
|
||||
|
||||
/**
|
||||
* Protocol use
|
||||
*
|
||||
*/
|
||||
@KerberosInfo(serverPrincipal = CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_USER_NAME_KEY)
|
||||
@Private
|
||||
@InterfaceStability.Evolving
|
||||
public interface HSAdminProtocol extends GetUserMappingsProtocol,
|
||||
RefreshUserMappingsProtocol, HSAdminRefreshProtocol {
|
||||
|
||||
}
|
@ -0,0 +1,43 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.mapreduce.v2.hs.protocol;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
import org.apache.hadoop.security.KerberosInfo;
|
||||
|
||||
/**
|
||||
* Protocol use
|
||||
*
|
||||
*/
|
||||
@KerberosInfo(serverPrincipal = CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_USER_NAME_KEY)
|
||||
@Private
|
||||
@InterfaceStability.Evolving
|
||||
public interface HSAdminRefreshProtocol {
|
||||
/**
|
||||
* Refresh admin acls.
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
public void refreshAdminAcls() throws IOException;
|
||||
|
||||
}
|
@ -0,0 +1,73 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.mapreduce.v2.hs.protocolPB;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.ipc.ProtobufHelper;
|
||||
import org.apache.hadoop.ipc.ProtocolMetaInterface;
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.ipc.RpcClientUtil;
|
||||
import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshAdminAclsRequestProto;
|
||||
import org.apache.hadoop.mapreduce.v2.hs.protocol.HSAdminRefreshProtocol;
|
||||
|
||||
import com.google.protobuf.RpcController;
|
||||
import com.google.protobuf.ServiceException;
|
||||
|
||||
@Private
|
||||
public class HSAdminRefreshProtocolClientSideTranslatorPB implements
|
||||
ProtocolMetaInterface, HSAdminRefreshProtocol, Closeable {
|
||||
|
||||
/** RpcController is not used and hence is set to null */
|
||||
private final static RpcController NULL_CONTROLLER = null;
|
||||
|
||||
private final HSAdminRefreshProtocolPB rpcProxy;
|
||||
|
||||
private final static RefreshAdminAclsRequestProto VOID_REFRESH_ADMIN_ACLS_REQUEST = RefreshAdminAclsRequestProto
|
||||
.newBuilder().build();
|
||||
|
||||
public HSAdminRefreshProtocolClientSideTranslatorPB(
|
||||
HSAdminRefreshProtocolPB rpcProxy) {
|
||||
this.rpcProxy = rpcProxy;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
RPC.stopProxy(rpcProxy);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void refreshAdminAcls() throws IOException {
|
||||
try {
|
||||
rpcProxy.refreshAdminAcls(NULL_CONTROLLER,
|
||||
VOID_REFRESH_ADMIN_ACLS_REQUEST);
|
||||
} catch (ServiceException se) {
|
||||
throw ProtobufHelper.getRemoteException(se);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isMethodSupported(String methodName) throws IOException {
|
||||
return RpcClientUtil.isMethodSupported(rpcProxy,
|
||||
HSAdminRefreshProtocolPB.class, RPC.RpcKind.RPC_PROTOCOL_BUFFER,
|
||||
RPC.getProtocolVersion(HSAdminRefreshProtocolPB.class), methodName);
|
||||
}
|
||||
}
|
@ -0,0 +1,33 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.mapreduce.v2.hs.protocolPB;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
import org.apache.hadoop.ipc.ProtocolInfo;
|
||||
import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.HSAdminRefreshProtocolService;
|
||||
import org.apache.hadoop.security.KerberosInfo;
|
||||
|
||||
@KerberosInfo(serverPrincipal = CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_USER_NAME_KEY)
|
||||
@ProtocolInfo(protocolName = "org.apache.hadoop.mapreduce.v2.hs.protocol.HSAdminRefreshProtocol", protocolVersion = 1)
|
||||
@Private
|
||||
@InterfaceStability.Evolving
|
||||
public interface HSAdminRefreshProtocolPB extends
|
||||
HSAdminRefreshProtocolService.BlockingInterface {
|
||||
}
|
@ -0,0 +1,57 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.mapreduce.v2.hs.protocolPB;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshAdminAclsResponseProto;
|
||||
import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshAdminAclsRequestProto;
|
||||
import org.apache.hadoop.mapreduce.v2.hs.protocol.HSAdminRefreshProtocol;
|
||||
|
||||
import com.google.protobuf.RpcController;
|
||||
import com.google.protobuf.ServiceException;
|
||||
|
||||
@Private
|
||||
public class HSAdminRefreshProtocolServerSideTranslatorPB implements
|
||||
HSAdminRefreshProtocolPB {
|
||||
|
||||
private final HSAdminRefreshProtocol impl;
|
||||
|
||||
private final static RefreshAdminAclsResponseProto VOID_REFRESH_ADMIN_ACLS_RESPONSE = RefreshAdminAclsResponseProto
|
||||
.newBuilder().build();
|
||||
|
||||
public HSAdminRefreshProtocolServerSideTranslatorPB(
|
||||
HSAdminRefreshProtocol impl) {
|
||||
this.impl = impl;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RefreshAdminAclsResponseProto refreshAdminAcls(
|
||||
RpcController controller, RefreshAdminAclsRequestProto request)
|
||||
throws ServiceException {
|
||||
try {
|
||||
impl.refreshAdminAcls();
|
||||
} catch (IOException e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
return VOID_REFRESH_ADMIN_ACLS_RESPONSE;
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,196 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.mapreduce.v2.hs.server;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.ipc.WritableRpcEngine;
|
||||
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.apache.hadoop.security.Groups;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.authorize.AccessControlList;
|
||||
import org.apache.hadoop.security.authorize.ProxyUsers;
|
||||
import org.apache.hadoop.security.proto.RefreshUserMappingsProtocolProtos.RefreshUserMappingsProtocolService;
|
||||
import org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolPB;
|
||||
import org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolServerSideTranslatorPB;
|
||||
import org.apache.hadoop.service.AbstractService;
|
||||
import org.apache.hadoop.tools.proto.GetUserMappingsProtocolProtos.GetUserMappingsProtocolService;
|
||||
import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolPB;
|
||||
import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolServerSideTranslatorPB;
|
||||
import org.apache.hadoop.mapreduce.v2.hs.HSAuditLogger;
|
||||
import org.apache.hadoop.mapreduce.v2.hs.HSAuditLogger.AuditConstants;
|
||||
import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.HSAdminRefreshProtocolService;
|
||||
import org.apache.hadoop.mapreduce.v2.hs.protocol.HSAdminProtocol;
|
||||
import org.apache.hadoop.mapreduce.v2.hs.protocolPB.HSAdminRefreshProtocolPB;
|
||||
import org.apache.hadoop.mapreduce.v2.hs.protocolPB.HSAdminRefreshProtocolServerSideTranslatorPB;
|
||||
|
||||
import com.google.protobuf.BlockingService;
|
||||
|
||||
@Private
|
||||
public class HSAdminServer extends AbstractService implements HSAdminProtocol {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(HSAdminServer.class);
|
||||
private AccessControlList adminAcl;
|
||||
|
||||
/** The RPC server that listens to requests from clients */
|
||||
protected RPC.Server clientRpcServer;
|
||||
protected InetSocketAddress clientRpcAddress;
|
||||
private static final String HISTORY_ADMIN_SERVER = "HSAdminServer";
|
||||
|
||||
public HSAdminServer() {
|
||||
super(HSAdminServer.class.getName());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void serviceInit(Configuration conf) throws Exception {
|
||||
RPC.setProtocolEngine(conf, RefreshUserMappingsProtocolPB.class,
|
||||
ProtobufRpcEngine.class);
|
||||
|
||||
RefreshUserMappingsProtocolServerSideTranslatorPB refreshUserMappingXlator = new RefreshUserMappingsProtocolServerSideTranslatorPB(
|
||||
this);
|
||||
BlockingService refreshUserMappingService = RefreshUserMappingsProtocolService
|
||||
.newReflectiveBlockingService(refreshUserMappingXlator);
|
||||
|
||||
GetUserMappingsProtocolServerSideTranslatorPB getUserMappingXlator = new GetUserMappingsProtocolServerSideTranslatorPB(
|
||||
this);
|
||||
BlockingService getUserMappingService = GetUserMappingsProtocolService
|
||||
.newReflectiveBlockingService(getUserMappingXlator);
|
||||
|
||||
HSAdminRefreshProtocolServerSideTranslatorPB refreshHSAdminProtocolXlator = new HSAdminRefreshProtocolServerSideTranslatorPB(
|
||||
this);
|
||||
BlockingService refreshHSAdminProtocolService = HSAdminRefreshProtocolService
|
||||
.newReflectiveBlockingService(refreshHSAdminProtocolXlator);
|
||||
|
||||
WritableRpcEngine.ensureInitialized();
|
||||
|
||||
clientRpcAddress = conf.getSocketAddr(JHAdminConfig.JHS_ADMIN_ADDRESS,
|
||||
JHAdminConfig.DEFAULT_JHS_ADMIN_ADDRESS,
|
||||
JHAdminConfig.DEFAULT_JHS_ADMIN_PORT);
|
||||
clientRpcServer = new RPC.Builder(conf)
|
||||
.setProtocol(RefreshUserMappingsProtocolPB.class)
|
||||
.setInstance(refreshUserMappingService)
|
||||
.setBindAddress(clientRpcAddress.getHostName())
|
||||
.setPort(clientRpcAddress.getPort()).setVerbose(false).build();
|
||||
|
||||
addProtocol(conf, GetUserMappingsProtocolPB.class, getUserMappingService);
|
||||
addProtocol(conf, HSAdminRefreshProtocolPB.class, refreshHSAdminProtocolService);
|
||||
|
||||
adminAcl = new AccessControlList(conf.get(JHAdminConfig.JHS_ADMIN_ACL,
|
||||
JHAdminConfig.DEFAULT_JHS_ADMIN_ACL));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceStart() throws Exception {
|
||||
clientRpcServer.start();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceStop() throws Exception {
|
||||
if (clientRpcServer != null) {
|
||||
clientRpcServer.stop();
|
||||
}
|
||||
}
|
||||
|
||||
private void addProtocol(Configuration conf, Class<?> protocol,
|
||||
BlockingService blockingService) throws IOException {
|
||||
RPC.setProtocolEngine(conf, protocol, ProtobufRpcEngine.class);
|
||||
clientRpcServer.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, protocol,
|
||||
blockingService);
|
||||
}
|
||||
|
||||
private UserGroupInformation checkAcls(String method) throws IOException {
|
||||
UserGroupInformation user;
|
||||
try {
|
||||
user = UserGroupInformation.getCurrentUser();
|
||||
} catch (IOException ioe) {
|
||||
LOG.warn("Couldn't get current user", ioe);
|
||||
|
||||
HSAuditLogger.logFailure("UNKNOWN", method, adminAcl.toString(),
|
||||
HISTORY_ADMIN_SERVER, "Couldn't get current user");
|
||||
|
||||
throw ioe;
|
||||
}
|
||||
|
||||
if (!adminAcl.isUserAllowed(user)) {
|
||||
LOG.warn("User " + user.getShortUserName() + " doesn't have permission"
|
||||
+ " to call '" + method + "'");
|
||||
|
||||
HSAuditLogger.logFailure(user.getShortUserName(), method,
|
||||
adminAcl.toString(), HISTORY_ADMIN_SERVER,
|
||||
AuditConstants.UNAUTHORIZED_USER);
|
||||
|
||||
throw new AccessControlException("User " + user.getShortUserName()
|
||||
+ " doesn't have permission" + " to call '" + method + "'");
|
||||
}
|
||||
LOG.info("HS Admin: " + method + " invoked by user "
|
||||
+ user.getShortUserName());
|
||||
|
||||
return user;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String[] getGroupsForUser(String user) throws IOException {
|
||||
return UserGroupInformation.createRemoteUser(user).getGroupNames();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void refreshUserToGroupsMappings() throws IOException {
|
||||
|
||||
UserGroupInformation user = checkAcls("refreshUserToGroupsMappings");
|
||||
|
||||
Groups.getUserToGroupsMappingService().refresh();
|
||||
|
||||
HSAuditLogger.logSuccess(user.getShortUserName(),
|
||||
"refreshUserToGroupsMappings", HISTORY_ADMIN_SERVER);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void refreshSuperUserGroupsConfiguration() throws IOException {
|
||||
UserGroupInformation user = checkAcls("refreshSuperUserGroupsConfiguration");
|
||||
|
||||
ProxyUsers.refreshSuperUserGroupsConfiguration(createConf());
|
||||
|
||||
HSAuditLogger.logSuccess(user.getShortUserName(),
|
||||
"refreshSuperUserGroupsConfiguration", HISTORY_ADMIN_SERVER);
|
||||
}
|
||||
|
||||
protected Configuration createConf() {
|
||||
return new Configuration();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void refreshAdminAcls() throws IOException {
|
||||
UserGroupInformation user = checkAcls("refreshAdminAcls");
|
||||
|
||||
Configuration conf = createConf();
|
||||
adminAcl = new AccessControlList(conf.get(JHAdminConfig.JHS_ADMIN_ACL,
|
||||
JHAdminConfig.DEFAULT_JHS_ADMIN_ACL));
|
||||
HSAuditLogger.logSuccess(user.getShortUserName(), "refreshAdminAcls",
|
||||
HISTORY_ADMIN_SERVER);
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,45 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
option java_package = "org.apache.hadoop.mapreduce.v2.hs.proto";
|
||||
option java_outer_classname = "HSAdminRefreshProtocolProtos";
|
||||
option java_generic_services = true;
|
||||
option java_generate_equals_and_hash = true;
|
||||
|
||||
/**
|
||||
* refresh admin acls request.
|
||||
*/
|
||||
message RefreshAdminAclsRequestProto {
|
||||
}
|
||||
|
||||
/**
|
||||
* Response for refresh admin acls.
|
||||
*/
|
||||
message RefreshAdminAclsResponseProto {
|
||||
}
|
||||
|
||||
/**
|
||||
* Refresh Protocols implemented by the History server
|
||||
*/
|
||||
service HSAdminRefreshProtocolService {
|
||||
/**
|
||||
* Refresh admin acls
|
||||
*/
|
||||
rpc refreshAdminAcls(RefreshAdminAclsRequestProto)
|
||||
returns(RefreshAdminAclsResponseProto);
|
||||
}
|
@ -75,7 +75,7 @@ public void testStartStopServer() throws Exception {
|
||||
Configuration config = new Configuration();
|
||||
historyServer.init(config);
|
||||
assertEquals(STATE.INITED, historyServer.getServiceState());
|
||||
assertEquals(3, historyServer.getServices().size());
|
||||
assertEquals(4, historyServer.getServices().size());
|
||||
HistoryClientService historyService = historyServer.getClientService();
|
||||
assertNotNull(historyServer.getClientService());
|
||||
assertEquals(STATE.INITED, historyService.getServiceState());
|
||||
|
@ -0,0 +1,241 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.mapreduce.v2.hs.server;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.hadoop.mapreduce.v2.hs.client.HSAdmin;
|
||||
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
|
||||
import org.apache.hadoop.security.GroupMappingServiceProvider;
|
||||
import org.apache.hadoop.security.Groups;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.authorize.ProxyUsers;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import org.apache.hadoop.security.authorize.AuthorizationException;
|
||||
|
||||
public class TestHSAdminServer {
|
||||
private HSAdminServer hsAdminServer = null;
|
||||
private HSAdmin hsAdminClient = null;
|
||||
Configuration conf = null;
|
||||
private static long groupRefreshTimeoutSec = 1;
|
||||
|
||||
public static class MockUnixGroupsMapping implements
|
||||
GroupMappingServiceProvider {
|
||||
private int i = 0;
|
||||
|
||||
@Override
|
||||
public List<String> getGroups(String user) throws IOException {
|
||||
System.out.println("Getting groups in MockUnixGroupsMapping");
|
||||
String g1 = user + (10 * i + 1);
|
||||
String g2 = user + (10 * i + 2);
|
||||
List<String> l = new ArrayList<String>(2);
|
||||
l.add(g1);
|
||||
l.add(g2);
|
||||
i++;
|
||||
return l;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cacheGroupsRefresh() throws IOException {
|
||||
System.out.println("Refreshing groups in MockUnixGroupsMapping");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cacheGroupsAdd(List<String> groups) throws IOException {
|
||||
}
|
||||
}
|
||||
|
||||
@Before
|
||||
public void init() throws HadoopIllegalArgumentException, IOException {
|
||||
conf = new Configuration();
|
||||
conf.set(JHAdminConfig.JHS_ADMIN_ADDRESS, "0.0.0.0:0");
|
||||
conf.setClass("hadoop.security.group.mapping", MockUnixGroupsMapping.class,
|
||||
GroupMappingServiceProvider.class);
|
||||
conf.setLong("hadoop.security.groups.cache.secs", groupRefreshTimeoutSec);
|
||||
Groups.getUserToGroupsMappingService(conf);
|
||||
hsAdminServer = new HSAdminServer() {
|
||||
@Override
|
||||
protected Configuration createConf() {
|
||||
return conf;
|
||||
}
|
||||
};
|
||||
hsAdminServer.init(conf);
|
||||
hsAdminServer.start();
|
||||
conf.setSocketAddr(JHAdminConfig.JHS_ADMIN_ADDRESS,
|
||||
hsAdminServer.clientRpcServer.getListenerAddress());
|
||||
hsAdminClient = new HSAdmin(conf);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetGroups() throws Exception {
|
||||
// Get the current user
|
||||
String user = UserGroupInformation.getCurrentUser().getUserName();
|
||||
String[] args = new String[2];
|
||||
args[0] = "-getGroups";
|
||||
args[1] = user;
|
||||
// Run the getGroups command
|
||||
int exitCode = hsAdminClient.run(args);
|
||||
assertEquals("Exit code should be 0 but was: " + exitCode, 0, exitCode);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRefreshUserToGroupsMappings() throws Exception {
|
||||
|
||||
String[] args = new String[] { "-refreshUserToGroupsMappings" };
|
||||
Groups groups = Groups.getUserToGroupsMappingService(conf);
|
||||
String user = UserGroupInformation.getCurrentUser().getUserName();
|
||||
System.out.println("first attempt:");
|
||||
List<String> g1 = groups.getGroups(user);
|
||||
String[] str_groups = new String[g1.size()];
|
||||
g1.toArray(str_groups);
|
||||
System.out.println(Arrays.toString(str_groups));
|
||||
|
||||
// Now groups of this user has changed but getGroups returns from the
|
||||
// cache,so we would see same groups as before
|
||||
System.out.println("second attempt, should be same:");
|
||||
List<String> g2 = groups.getGroups(user);
|
||||
g2.toArray(str_groups);
|
||||
System.out.println(Arrays.toString(str_groups));
|
||||
for (int i = 0; i < g2.size(); i++) {
|
||||
assertEquals("Should be same group ", g1.get(i), g2.get(i));
|
||||
}
|
||||
// run the command,which clears the cache
|
||||
hsAdminClient.run(args);
|
||||
System.out
|
||||
.println("third attempt(after refresh command), should be different:");
|
||||
// Now get groups should return new groups
|
||||
List<String> g3 = groups.getGroups(user);
|
||||
g3.toArray(str_groups);
|
||||
System.out.println(Arrays.toString(str_groups));
|
||||
for (int i = 0; i < g3.size(); i++) {
|
||||
assertFalse(
|
||||
"Should be different group: " + g1.get(i) + " and " + g3.get(i), g1
|
||||
.get(i).equals(g3.get(i)));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRefreshSuperUserGroups() throws Exception {
|
||||
|
||||
UserGroupInformation ugi = mock(UserGroupInformation.class);
|
||||
UserGroupInformation superUser = mock(UserGroupInformation.class);
|
||||
|
||||
when(ugi.getRealUser()).thenReturn(superUser);
|
||||
when(superUser.getShortUserName()).thenReturn("superuser");
|
||||
when(superUser.getUserName()).thenReturn("superuser");
|
||||
when(ugi.getGroupNames()).thenReturn(new String[] { "group3" });
|
||||
when(ugi.getUserName()).thenReturn("regularUser");
|
||||
|
||||
// Set super user groups not to include groups of regularUser
|
||||
conf.set("hadoop.proxyuser.superuser.groups", "group1,group2");
|
||||
conf.set("hadoop.proxyuser.superuser.hosts", "127.0.0.1");
|
||||
String[] args = new String[1];
|
||||
args[0] = "-refreshSuperUserGroupsConfiguration";
|
||||
hsAdminClient.run(args);
|
||||
|
||||
Throwable th = null;
|
||||
try {
|
||||
ProxyUsers.authorize(ugi, "127.0.0.1", conf);
|
||||
} catch (Exception e) {
|
||||
th = e;
|
||||
}
|
||||
// Exception should be thrown
|
||||
assertTrue(th instanceof AuthorizationException);
|
||||
|
||||
// Now add regularUser group to superuser group but not execute
|
||||
// refreshSuperUserGroupMapping
|
||||
conf.set("hadoop.proxyuser.superuser.groups", "group1,group2,group3");
|
||||
|
||||
// Again,lets run ProxyUsers.authorize and see if regularUser can be
|
||||
// impersonated
|
||||
// resetting th
|
||||
th = null;
|
||||
try {
|
||||
ProxyUsers.authorize(ugi, "127.0.0.1", conf);
|
||||
} catch (Exception e) {
|
||||
th = e;
|
||||
}
|
||||
// Exception should be thrown again since we didn't refresh the configs
|
||||
assertTrue(th instanceof AuthorizationException);
|
||||
|
||||
// Lets refresh the config by running refreshSuperUserGroupsConfiguration
|
||||
hsAdminClient.run(args);
|
||||
|
||||
th = null;
|
||||
|
||||
try {
|
||||
ProxyUsers.authorize(ugi, "127.0.0.1", conf);
|
||||
} catch (Exception e) {
|
||||
th = e;
|
||||
}
|
||||
// No exception thrown since regularUser can be impersonated.
|
||||
assertNull("Unexpected exception thrown: " + th, th);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRefreshAdminAcls() throws Exception {
|
||||
// Setting current user to admin acl
|
||||
conf.set(JHAdminConfig.JHS_ADMIN_ACL, UserGroupInformation.getCurrentUser()
|
||||
.getUserName());
|
||||
String[] args = new String[1];
|
||||
args[0] = "-refreshAdminAcls";
|
||||
hsAdminClient.run(args);
|
||||
|
||||
// Now I should be able to run any hsadmin command without any exception
|
||||
// being thrown
|
||||
args[0] = "-refreshSuperUserGroupsConfiguration";
|
||||
hsAdminClient.run(args);
|
||||
|
||||
// Lets remove current user from admin acl
|
||||
conf.set(JHAdminConfig.JHS_ADMIN_ACL, "notCurrentUser");
|
||||
args[0] = "-refreshAdminAcls";
|
||||
hsAdminClient.run(args);
|
||||
|
||||
// Now I should get an exception if i run any hsadmin command
|
||||
Throwable th = null;
|
||||
args[0] = "-refreshSuperUserGroupsConfiguration";
|
||||
try {
|
||||
hsAdminClient.run(args);
|
||||
} catch (Exception e) {
|
||||
th = e;
|
||||
}
|
||||
assertTrue(th instanceof RemoteException);
|
||||
}
|
||||
|
||||
@After
|
||||
public void cleanUp() {
|
||||
if (hsAdminServer != null)
|
||||
hsAdminServer.stop();
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue
Block a user