YARN-355. Fixes a bug where RM app submission could jam under load. Contributed by Daryn Sharp.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1443131 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Siddharth Seth 2013-02-06 19:03:52 +00:00
parent 8159dad856
commit ab16a37572
8 changed files with 252 additions and 129 deletions

View File

@ -234,6 +234,9 @@ Release 2.0.3-alpha - 2013-02-06
YARN-370. Fix SchedulerUtils to correctly round up the resource for YARN-370. Fix SchedulerUtils to correctly round up the resource for
containers. (Zhijie Shen via acmurthy) containers. (Zhijie Shen via acmurthy)
YARN-355. Fixes a bug where RM app submission could jam under load.
(Daryn Sharp via sseth)
Release 2.0.2-alpha - 2012-09-07 Release 2.0.2-alpha - 2012-09-07
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -25,13 +25,11 @@
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.yarn.api.ClientRMProtocol; import org.apache.hadoop.yarn.api.ClientRMProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
@ -47,8 +45,6 @@
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationReport;
@ -199,30 +195,6 @@ public DelegationToken getRMDelegationToken(Text renewer)
} }
// Not part of YarnClient itself. Placed in YarnClientImpl while renew/cancel
// are part of ClientRMProtocol.
@Private
public long renewRMDelegationToken(DelegationToken rmToken)
throws YarnRemoteException {
RenewDelegationTokenRequest request = Records
.newRecord(RenewDelegationTokenRequest.class);
request.setDelegationToken(rmToken);
RenewDelegationTokenResponse response = rmClient
.renewDelegationToken(request);
return response.getNextExpirationTime();
}
// Not part of YarnClient itself. Placed in YarnClientImpl while renew/cancel
// are part of ClietnRMProtocol
@Private
public void cancelRMDelegationToken(DelegationToken rmToken)
throws YarnRemoteException {
CancelDelegationTokenRequest request = Records
.newRecord(CancelDelegationTokenRequest.class);
request.setDelegationToken(rmToken);
rmClient.cancelDelegationToken(request);
}
private GetQueueInfoRequest private GetQueueInfoRequest
getQueueInfoRequest(String queueName, boolean includeApplications, getQueueInfoRequest(String queueName, boolean includeApplications,
boolean includeChildQueues, boolean recursive) { boolean includeChildQueues, boolean recursive) {

View File

@ -1,83 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.security;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenRenewer;
import org.apache.hadoop.yarn.api.records.DelegationToken;
import org.apache.hadoop.yarn.client.YarnClientImpl;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.util.BuilderUtils;
public class RMDelegationTokenRenewer extends TokenRenewer {
@Override
public boolean handleKind(Text kind) {
return RMDelegationTokenIdentifier.KIND_NAME.equals(kind);
}
@Override
public boolean isManaged(Token<?> token) throws IOException {
return true;
}
@Override
public long renew(Token<?> token, Configuration conf) throws IOException,
InterruptedException {
YarnClientImpl yarnClient = getYarnClient(conf,
SecurityUtil.getTokenServiceAddr(token));
try {
DelegationToken dToken = BuilderUtils.newDelegationToken(
token.getIdentifier(), token.getKind().toString(),
token.getPassword(), token.getService().toString());
return yarnClient.renewRMDelegationToken(dToken);
} finally {
yarnClient.stop();
}
}
@Override
public void cancel(Token<?> token, Configuration conf) throws IOException,
InterruptedException {
YarnClientImpl yarnClient = getYarnClient(conf,
SecurityUtil.getTokenServiceAddr(token));
try {
DelegationToken dToken = BuilderUtils.newDelegationToken(
token.getIdentifier(), token.getKind().toString(),
token.getPassword(), token.getService().toString());
yarnClient.cancelRMDelegationToken(dToken);
return;
} finally {
yarnClient.stop();
}
}
private YarnClientImpl getYarnClient(Configuration conf,
InetSocketAddress rmAddress) {
YarnClientImpl yarnClient = new YarnClientImpl(rmAddress);
yarnClient.init(conf);
yarnClient.start();
return yarnClient;
}
}

View File

@ -1,14 +0,0 @@
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
org.apache.hadoop.yarn.security.RMDelegationTokenRenewer;

View File

@ -19,10 +19,28 @@
package org.apache.hadoop.yarn.security.client; package org.apache.hadoop.yarn.security.client;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenRenewer;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier; import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
import org.apache.hadoop.yarn.api.ClientRMProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
import org.apache.hadoop.yarn.api.records.DelegationToken;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
/** /**
* Delegation Token Identifier that identifies the delegation tokens from the * Delegation Token Identifier that identifies the delegation tokens from the
@ -51,4 +69,100 @@ public RMDelegationTokenIdentifier(Text owner, Text renewer, Text realUser) {
public Text getKind() { public Text getKind() {
return KIND_NAME; return KIND_NAME;
} }
public static class Renewer extends TokenRenewer {
@Override
public boolean handleKind(Text kind) {
return KIND_NAME.equals(kind);
}
@Override
public boolean isManaged(Token<?> token) throws IOException {
return true;
}
private static
AbstractDelegationTokenSecretManager<RMDelegationTokenIdentifier> localSecretManager;
private static InetSocketAddress localServiceAddress;
@Private
public static void setSecretManager(
AbstractDelegationTokenSecretManager<RMDelegationTokenIdentifier> secretManager,
InetSocketAddress serviceAddress) {
localSecretManager = secretManager;
localServiceAddress = serviceAddress;
}
@SuppressWarnings("unchecked")
@Override
public long renew(Token<?> token, Configuration conf) throws IOException,
InterruptedException {
final ClientRMProtocol rmClient = getRmClient(token, conf);
if (rmClient != null) {
try {
RenewDelegationTokenRequest request =
Records.newRecord(RenewDelegationTokenRequest.class);
request.setDelegationToken(convertToProtoToken(token));
return rmClient.renewDelegationToken(request).getNextExpirationTime();
} finally {
RPC.stopProxy(rmClient);
}
} else {
return localSecretManager.renewToken(
(Token<RMDelegationTokenIdentifier>)token, getRenewer(token));
}
}
@SuppressWarnings("unchecked")
@Override
public void cancel(Token<?> token, Configuration conf) throws IOException,
InterruptedException {
final ClientRMProtocol rmClient = getRmClient(token, conf);
if (rmClient != null) {
try {
CancelDelegationTokenRequest request =
Records.newRecord(CancelDelegationTokenRequest.class);
request.setDelegationToken(convertToProtoToken(token));
rmClient.cancelDelegationToken(request);
} finally {
RPC.stopProxy(rmClient);
}
} else {
localSecretManager.cancelToken(
(Token<RMDelegationTokenIdentifier>)token, getRenewer(token));
}
}
private static ClientRMProtocol getRmClient(Token<?> token,
Configuration conf) {
InetSocketAddress addr = SecurityUtil.getTokenServiceAddr(token);
if (localSecretManager != null) {
// return null if it's our token
if (localServiceAddress.getAddress().isAnyLocalAddress()) {
if (NetUtils.isLocalAddress(addr.getAddress()) &&
addr.getPort() == localServiceAddress.getPort()) {
return null;
}
} else if (addr.equals(localServiceAddress)) {
return null;
}
}
final YarnRPC rpc = YarnRPC.create(conf);
return (ClientRMProtocol)rpc.getProxy(ClientRMProtocol.class, addr, conf);
}
// get renewer so we can always renew our own tokens
@SuppressWarnings("unchecked")
private static String getRenewer(Token<?> token) throws IOException {
return ((Token<RMDelegationTokenIdentifier>)token).decodeIdentifier()
.getRenewer().toString();
}
private static DelegationToken convertToProtoToken(Token<?> token) {
return BuilderUtils.newDelegationToken(
token.getIdentifier(), token.getKind().toString(),
token.getPassword(), token.getService().toString());
}
}
} }

View File

@ -13,3 +13,4 @@
# #
org.apache.hadoop.yarn.security.ApplicationTokenIdentifier$Renewer org.apache.hadoop.yarn.security.ApplicationTokenIdentifier$Renewer
org.apache.hadoop.yarn.security.ContainerTokenIdentifier$Renewer org.apache.hadoop.yarn.security.ContainerTokenIdentifier$Renewer
org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier$Renewer

View File

@ -157,6 +157,10 @@ public void start() {
this.server.start(); this.server.start();
clientBindAddress = conf.updateConnectAddr(YarnConfiguration.RM_ADDRESS, clientBindAddress = conf.updateConnectAddr(YarnConfiguration.RM_ADDRESS,
server.getListenerAddress()); server.getListenerAddress());
// enable RM to short-circuit token operations directly to itself
RMDelegationTokenIdentifier.Renewer.setSecretManager(
rmDTSecretManager, clientBindAddress);
super.start(); super.start();
} }

View File

@ -17,13 +17,12 @@
package org.apache.hadoop.yarn.server.resourcemanager; package org.apache.hadoop.yarn.server.resourcemanager;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.*;
import static org.junit.Assert.fail; import static org.mockito.Mockito.*;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException; import java.lang.reflect.UndeclaredThrowableException;
import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.security.PrivilegedAction; import java.security.PrivilegedAction;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
@ -34,9 +33,15 @@
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.ClientRMProtocol; import org.apache.hadoop.yarn.api.ClientRMProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
@ -46,12 +51,14 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.RMDelegationTokenSecretManager; import org.apache.hadoop.yarn.server.RMDelegationTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.ProtoUtils; import org.apache.hadoop.yarn.util.ProtoUtils;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -59,6 +66,10 @@ public class TestClientRMTokens {
private static final Log LOG = LogFactory.getLog(TestClientRMTokens.class); private static final Log LOG = LogFactory.getLog(TestClientRMTokens.class);
@Before
public void resetSecretManager() {
RMDelegationTokenIdentifier.Renewer.setSecretManager(null, null);
}
@Test @Test
public void testDelegationToken() throws IOException, InterruptedException { public void testDelegationToken() throws IOException, InterruptedException {
@ -200,7 +211,122 @@ public void testDelegationToken() throws IOException, InterruptedException {
RPC.stopProxy(clientRMWithDT); RPC.stopProxy(clientRMWithDT);
} }
} }
}
@Test
public void testShortCircuitRenewCancel()
throws IOException, InterruptedException {
InetSocketAddress addr =
new InetSocketAddress(InetAddress.getLocalHost(), 123);
checkShortCircuitRenewCancel(addr, addr, true);
}
@Test
public void testShortCircuitRenewCancelWildcardAddress()
throws IOException, InterruptedException {
InetSocketAddress rmAddr = new InetSocketAddress(123);
checkShortCircuitRenewCancel(
rmAddr,
new InetSocketAddress(InetAddress.getLocalHost(), rmAddr.getPort()),
true);
}
@Test
public void testShortCircuitRenewCancelSameHostDifferentPort()
throws IOException, InterruptedException {
InetSocketAddress rmAddr =
new InetSocketAddress(InetAddress.getLocalHost(), 123);
checkShortCircuitRenewCancel(
rmAddr,
new InetSocketAddress(rmAddr.getAddress(), rmAddr.getPort()+1),
false);
}
@Test
public void testShortCircuitRenewCancelDifferentHostSamePort()
throws IOException, InterruptedException {
InetSocketAddress rmAddr =
new InetSocketAddress(InetAddress.getLocalHost(), 123);
checkShortCircuitRenewCancel(
rmAddr,
new InetSocketAddress("1.1.1.1", rmAddr.getPort()),
false);
}
@Test
public void testShortCircuitRenewCancelDifferentHostDifferentPort()
throws IOException, InterruptedException {
InetSocketAddress rmAddr =
new InetSocketAddress(InetAddress.getLocalHost(), 123);
checkShortCircuitRenewCancel(
rmAddr,
new InetSocketAddress("1.1.1.1", rmAddr.getPort()+1),
false);
}
@SuppressWarnings("unchecked")
private void checkShortCircuitRenewCancel(InetSocketAddress rmAddr,
InetSocketAddress serviceAddr,
boolean shouldShortCircuit
) throws IOException, InterruptedException {
Configuration conf = new Configuration();
conf.setClass(YarnConfiguration.IPC_RPC_IMPL,
YarnBadRPC.class, YarnRPC.class);
RMDelegationTokenSecretManager secretManager =
mock(RMDelegationTokenSecretManager.class);
RMDelegationTokenIdentifier.Renewer.setSecretManager(secretManager, rmAddr);
RMDelegationTokenIdentifier ident = new RMDelegationTokenIdentifier(
new Text("owner"), new Text("renewer"), null);
Token<RMDelegationTokenIdentifier> token =
new Token<RMDelegationTokenIdentifier>(ident, secretManager);
SecurityUtil.setTokenService(token, serviceAddr);
if (shouldShortCircuit) {
token.renew(conf);
verify(secretManager).renewToken(eq(token), eq("renewer"));
reset(secretManager);
token.cancel(conf);
verify(secretManager).cancelToken(eq(token), eq("renewer"));
} else {
try {
token.renew(conf);
fail();
} catch (RuntimeException e) {
assertEquals("getProxy", e.getMessage());
}
verify(secretManager, never()).renewToken(any(Token.class), anyString());
try {
token.cancel(conf);
fail();
} catch (RuntimeException e) {
assertEquals("getProxy", e.getMessage());
}
verify(secretManager, never()).cancelToken(any(Token.class), anyString());
}
}
@SuppressWarnings("rawtypes")
public static class YarnBadRPC extends YarnRPC {
@Override
public Object getProxy(Class protocol, InetSocketAddress addr,
Configuration conf) {
throw new RuntimeException("getProxy");
}
@Override
public void stopProxy(Object proxy, Configuration conf) {
throw new RuntimeException("stopProxy");
}
@Override
public Server getServer(Class protocol, Object instance,
InetSocketAddress addr, Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager,
int numHandlers, String portRangeConfig) {
throw new RuntimeException("getServer");
}
} }
// Get the delegation token directly as it is a little difficult to setup // Get the delegation token directly as it is a little difficult to setup