YARN-986. Changed client side to be able to figure out the right RM Delegation token for the right ResourceManager when HA is enabled. Contributed by Karthik Kambatla.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1574190 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2014-03-04 20:39:06 +00:00
parent 40464fba22
commit 88245b6a41
14 changed files with 177 additions and 30 deletions

View File

@ -19,7 +19,6 @@
package org.apache.hadoop.mapred;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
@ -56,6 +55,7 @@
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -74,7 +74,7 @@ public class ResourceMgrDelegate extends YarnClient {
@Private
@VisibleForTesting
protected YarnClient client;
private InetSocketAddress rmAddress;
private Text rmDTService;
/**
* Delegate responsible for communicating with the Resource Manager's
@ -91,9 +91,6 @@ public ResourceMgrDelegate(YarnConfiguration conf) {
@Override
protected void serviceInit(Configuration conf) throws Exception {
this.rmAddress = conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_PORT);
client.init(conf);
super.serviceInit(conf);
}
@ -155,8 +152,11 @@ public ClusterMetrics getClusterMetrics() throws IOException,
}
}
InetSocketAddress getConnectAddress() {
return rmAddress;
public Text getRMDelegationTokenService() {
if (rmDTService == null) {
rmDTService = ClientRMProxy.getRMDelegationTokenService(conf);
}
return rmDTService;
}
@SuppressWarnings("rawtypes")
@ -164,7 +164,7 @@ public Token getDelegationToken(Text renewer) throws IOException,
InterruptedException {
try {
return ConverterUtils.convertFromYarn(
client.getRMDelegationToken(renewer), rmAddress);
client.getRMDelegationToken(renewer), getRMDelegationTokenService());
} catch (YarnException e) {
throw new IOException(e);
}

View File

@ -188,8 +188,7 @@ void addHistoryToken(Credentials ts) throws IOException, InterruptedException {
* to make sure we add history server delegation tokens to the credentials
*/
RMDelegationTokenSelector tokenSelector = new RMDelegationTokenSelector();
Text service = SecurityUtil.buildTokenService(resMgrDelegate
.getConnectAddress());
Text service = resMgrDelegate.getRMDelegationTokenService();
if (tokenSelector.selectToken(service, ts.getAllTokens()) != null) {
Text hsService = SecurityUtil.buildTokenService(hsProxy
.getConnectAddress());

View File

@ -299,7 +299,7 @@ public void testGetHSDelegationToken() throws Exception {
any(GetDelegationTokenRequest.class));
ResourceMgrDelegate rmDelegate = mock(ResourceMgrDelegate.class);
doReturn(mockRmAddress).when(rmDelegate).getConnectAddress();
doReturn(rmTokenSevice).when(rmDelegate).getRMDelegationTokenService();
ClientCache clientCache = mock(ClientCache.class);
doReturn(mockHsProxy).when(clientCache).getInitializedHSProxy();

View File

@ -252,6 +252,10 @@ Release 2.4.0 - UNRELEASED
YARN-1730. Implemented simple write-locking in the LevelDB based timeline-
store. (Billie Rinaldi via vinodkv)
YARN-986. Changed client side to be able to figure out the right RM Delegation
token for the right ResourceManager when HA is enabled. (Karthik Kambatla via
vinodkv)
OPTIMIZATIONS
BUG FIXES

View File

@ -1170,7 +1170,9 @@ public static List<String> getServiceAddressConfKeys(Configuration conf) {
/**
* Get the socket address for <code>name</code> property as a
* <code>InetSocketAddress</code>.
* <code>InetSocketAddress</code>. On a HA cluster,
* this fetches the address corresponding to the RM identified by
* {@link #RM_HA_ID}.
* @param name property name.
* @param defaultAddress the default value
* @param defaultPort the default port
@ -1227,4 +1229,14 @@ public static boolean useHttps(Configuration conf) {
.get(YARN_HTTP_POLICY_KEY,
YARN_HTTP_POLICY_DEFAULT));
}
@Private
public static String getClusterId(Configuration conf) {
String clusterId = conf.get(YarnConfiguration.RM_CLUSTER_ID);
if (clusterId == null) {
throw new HadoopIllegalArgumentException("Configuration doesn't specify" +
YarnConfiguration.RM_CLUSTER_ID);
}
return clusterId;
}
}

View File

@ -290,7 +290,7 @@ public ApplicationReport getApplicationReport(ApplicationId appId)
org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> amrmToken =
null;
if (token != null) {
amrmToken = ConverterUtils.convertFromYarn(token, null);
amrmToken = ConverterUtils.convertFromYarn(token, (Text) null);
}
return amrmToken;
}

View File

@ -20,23 +20,30 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import com.google.common.base.Joiner;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
import com.google.common.base.Preconditions;
@InterfaceAudience.Public
@InterfaceStability.Stable
public class ClientRMProxy<T> extends RMProxy<T> {
private static final Log LOG = LogFactory.getLog(ClientRMProxy.class);
private static final ClientRMProxy INSTANCE = new ClientRMProxy();
@ -67,7 +74,7 @@ private static void setupTokens(InetSocketAddress resourceManagerAddress)
throws IOException {
// It is assumed for now that the only AMRMToken in AM's UGI is for this
// cluster/RM. TODO: Fix later when we have some kind of cluster-ID as
// default service-address, see YARN-986.
// default service-address, see YARN-1779.
for (Token<? extends TokenIdentifier> token : UserGroupInformation
.getCurrentUser().getTokens()) {
if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
@ -115,4 +122,37 @@ protected void checkAllowedProtocols(Class<?> protocol) {
protocol.isAssignableFrom(ClientRMProtocols.class),
"RM does not support this client protocol");
}
/**
* Get the token service name to be used for RMDelegationToken. Depending
* on whether HA is enabled or not, this method generates the appropriate
* service name as a comma-separated list of service addresses.
*
* @param conf Configuration corresponding to the cluster we need the
* RMDelegationToken for
* @return - Service name for RMDelegationToken
*/
@InterfaceStability.Unstable
public static Text getRMDelegationTokenService(Configuration conf) {
if (HAUtil.isHAEnabled(conf)) {
// Build a list of service addresses to form the service name
ArrayList<String> services = new ArrayList<String>();
YarnConfiguration yarnConf = new YarnConfiguration(conf);
for (String rmId : HAUtil.getRMHAIds(conf)) {
// Set RM_ID to get the corresponding RM_ADDRESS
yarnConf.set(YarnConfiguration.RM_HA_ID, rmId);
services.add(SecurityUtil.buildTokenService(
yarnConf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_PORT)).toString());
}
return new Text(Joiner.on(',').join(services));
}
// Non-HA case - no need to set RM_ID
return SecurityUtil.buildTokenService(
conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_PORT));
}
}

View File

@ -38,6 +38,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.Records;
@ -139,7 +140,9 @@ public void cancel(Token<?> token, Configuration conf) throws IOException,
private static ApplicationClientProtocol getRmClient(Token<?> token,
Configuration conf) throws IOException {
InetSocketAddress addr = SecurityUtil.getTokenServiceAddr(token);
String[] services = token.getService().toString().split(",");
for (String service : services) {
InetSocketAddress addr = NetUtils.createSocketAddr(service);
if (localSecretManager != null) {
// return null if it's our token
if (localServiceAddress.getAddress().isAnyLocalAddress()) {
@ -151,6 +154,7 @@ private static ApplicationClientProtocol getRmClient(Token<?> token,
return null;
}
}
}
return ClientRMProxy.createRMProxy(conf, ApplicationClientProtocol.class);
}

View File

@ -37,6 +37,14 @@ public class RMDelegationTokenSelector implements
private static final Log LOG = LogFactory
.getLog(RMDelegationTokenSelector.class);
private boolean checkService(Text service,
Token<? extends TokenIdentifier> token) {
if (service == null || token.getService() == null) {
return false;
}
return token.getService().toString().contains(service.toString());
}
@SuppressWarnings("unchecked")
public Token<RMDelegationTokenIdentifier> selectToken(Text service,
Collection<Token<? extends TokenIdentifier>> tokens) {
@ -48,7 +56,7 @@ public Token<RMDelegationTokenIdentifier> selectToken(Text service,
LOG.debug("Token kind is " + token.getKind().toString()
+ " and the token's service name is " + token.getService());
if (RMDelegationTokenIdentifier.KIND_NAME.equals(token.getKind())
&& service.equals(token.getService())) {
&& checkService(service, token)) {
return (Token<RMDelegationTokenIdentifier>) token;
}
}

View File

@ -216,7 +216,11 @@ public static ApplicationId toApplicationId(
}
/**
* Convert a protobuf token into a rpc token and set its service
* Convert a protobuf token into a rpc token and set its service. Supposed
* to be used for tokens other than RMDelegationToken. For
* RMDelegationToken, use
* {@link #convertFromYarn(org.apache.hadoop.yarn.api.records.Token,
* org.apache.hadoop.io.Text)} instead.
*
* @param protoToken the yarn token
* @param serviceAddr the connect address for the service
@ -234,4 +238,24 @@ public static <T extends TokenIdentifier> Token<T> convertFromYarn(
}
return token;
}
/**
* Convert a protobuf token into a rpc token and set its service.
*
* @param protoToken the yarn token
* @param service the service for the token
*/
public static <T extends TokenIdentifier> Token<T> convertFromYarn(
org.apache.hadoop.yarn.api.records.Token protoToken,
Text service) {
Token<T> token = new Token<T>(protoToken.getIdentifier().array(),
protoToken.getPassword().array(),
new Text(protoToken.getKind()),
new Text(protoToken.getService()));
if (service != null) {
token.setService(service);
}
return token;
}
}

View File

@ -0,0 +1,59 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.client;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class TestClientRMProxy {
@Test
public void testGetRMDelegationTokenService() {
String defaultRMAddress = YarnConfiguration.DEFAULT_RM_ADDRESS;
YarnConfiguration conf = new YarnConfiguration();
// HA is not enabled
Text tokenService = ClientRMProxy.getRMDelegationTokenService(conf);
String[] services = tokenService.toString().split(",");
assertEquals(1, services.length);
for (String service : services) {
assertTrue("Incorrect token service name",
service.contains(defaultRMAddress));
}
// HA is enabled
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
conf.set(YarnConfiguration.RM_HA_IDS, "rm1,rm2");
conf.set(HAUtil.addSuffix(YarnConfiguration.RM_HOSTNAME, "rm1"),
"0.0.0.0");
conf.set(HAUtil.addSuffix(YarnConfiguration.RM_HOSTNAME, "rm2"),
"0.0.0.0");
tokenService = ClientRMProxy.getRMDelegationTokenService(conf);
services = tokenService.toString().split(",");
assertEquals(2, services.length);
for (String service : services) {
assertTrue("Incorrect token service name",
service.contains(defaultRMAddress));
}
}
}

View File

@ -72,11 +72,7 @@ protected synchronized void serviceInit(Configuration conf)
}
String rmId = HAUtil.getRMHAId(conf);
String clusterId = conf.get(YarnConfiguration.RM_CLUSTER_ID);
if (clusterId == null) {
throw new YarnRuntimeException(YarnConfiguration.RM_CLUSTER_ID +
" is not specified!");
}
String clusterId = YarnConfiguration.getClusterId(conf);
localActiveNodeInfo = createActiveNodeInfo(clusterId, rmId);
String zkBasePath = conf.get(YarnConfiguration.AUTO_FAILOVER_ZK_BASE_PATH,

View File

@ -548,7 +548,7 @@ protected abstract void removeApplicationStateInternal(
ApplicationState appState) throws Exception;
// TODO: This should eventually become cluster-Id + "AM_RM_TOKEN_SERVICE". See
// YARN-986
// YARN-1779
public static final Text AM_RM_TOKEN_SERVICE = new Text(
"AM_RM_TOKEN_SERVICE");

View File

@ -27,6 +27,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.DelegationKey;
@ -102,7 +103,7 @@ public void testRMDTMasterKeyStateOnRollingMasterKey() throws Exception {
org.apache.hadoop.yarn.api.records.Token delegationToken =
response.getRMDelegationToken();
Token<RMDelegationTokenIdentifier> token1 =
ConverterUtils.convertFromYarn(delegationToken, null);
ConverterUtils.convertFromYarn(delegationToken, (Text) null);
RMDelegationTokenIdentifier dtId1 = token1.decodeIdentifier();
// wait for the first rollMasterKey