diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 1fd179dc1a..620215692d 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -234,6 +234,9 @@ Release 2.0.3-alpha - 2013-02-06 YARN-370. Fix SchedulerUtils to correctly round up the resource for containers. (Zhijie Shen via acmurthy) + YARN-355. Fixes a bug where RM app submission could jam under load. + (Daryn Sharp via sseth) + Release 2.0.2-alpha - 2012-09-07 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClientImpl.java index 14994f97a7..eb84b31c79 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClientImpl.java @@ -25,13 +25,11 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.yarn.api.ClientRMProtocol; -import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; @@ -47,8 +45,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; -import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest; -import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; @@ -199,30 +195,6 @@ public DelegationToken getRMDelegationToken(Text renewer) } - // Not part of YarnClient itself. Placed in YarnClientImpl while renew/cancel - // are part of ClientRMProtocol. - @Private - public long renewRMDelegationToken(DelegationToken rmToken) - throws YarnRemoteException { - RenewDelegationTokenRequest request = Records - .newRecord(RenewDelegationTokenRequest.class); - request.setDelegationToken(rmToken); - RenewDelegationTokenResponse response = rmClient - .renewDelegationToken(request); - return response.getNextExpirationTime(); - } - - // Not part of YarnClient itself. Placed in YarnClientImpl while renew/cancel - // are part of ClietnRMProtocol - @Private - public void cancelRMDelegationToken(DelegationToken rmToken) - throws YarnRemoteException { - CancelDelegationTokenRequest request = Records - .newRecord(CancelDelegationTokenRequest.class); - request.setDelegationToken(rmToken); - rmClient.cancelDelegationToken(request); - } - private GetQueueInfoRequest getQueueInfoRequest(String queueName, boolean includeApplications, boolean includeChildQueues, boolean recursive) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/security/RMDelegationTokenRenewer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/security/RMDelegationTokenRenewer.java deleted file mode 100644 index 3f1caeec18..0000000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/security/RMDelegationTokenRenewer.java +++ /dev/null @@ -1,83 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.hadoop.yarn.security; - -import java.io.IOException; -import java.net.InetSocketAddress; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.security.SecurityUtil; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.security.token.TokenRenewer; -import org.apache.hadoop.yarn.api.records.DelegationToken; -import org.apache.hadoop.yarn.client.YarnClientImpl; -import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; -import org.apache.hadoop.yarn.util.BuilderUtils; - -public class RMDelegationTokenRenewer extends TokenRenewer { - - @Override - public boolean handleKind(Text kind) { - return RMDelegationTokenIdentifier.KIND_NAME.equals(kind); - } - - @Override - public boolean isManaged(Token token) throws IOException { - return true; - } - - @Override - public long renew(Token token, Configuration conf) throws IOException, - InterruptedException { - YarnClientImpl yarnClient = getYarnClient(conf, - SecurityUtil.getTokenServiceAddr(token)); - try { - DelegationToken dToken = BuilderUtils.newDelegationToken( - token.getIdentifier(), token.getKind().toString(), - token.getPassword(), token.getService().toString()); - return yarnClient.renewRMDelegationToken(dToken); - } finally { - yarnClient.stop(); - } - } - - @Override - public void cancel(Token token, Configuration conf) throws IOException, - InterruptedException { - YarnClientImpl yarnClient = getYarnClient(conf, - SecurityUtil.getTokenServiceAddr(token)); - try { - DelegationToken dToken = BuilderUtils.newDelegationToken( - token.getIdentifier(), token.getKind().toString(), - token.getPassword(), token.getService().toString()); - yarnClient.cancelRMDelegationToken(dToken); - return; - } finally { - yarnClient.stop(); - } - } - - private YarnClientImpl getYarnClient(Configuration conf, - InetSocketAddress rmAddress) { - YarnClientImpl yarnClient = new YarnClientImpl(rmAddress); - yarnClient.init(conf); - yarnClient.start(); - return yarnClient; - } -} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer deleted file mode 100644 index 9e78b1187e..0000000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer +++ /dev/null @@ -1,14 +0,0 @@ -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -org.apache.hadoop.yarn.security.RMDelegationTokenRenewer; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/RMDelegationTokenIdentifier.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/RMDelegationTokenIdentifier.java index fd3dbf06b6..73bdce4e99 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/RMDelegationTokenIdentifier.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/RMDelegationTokenIdentifier.java @@ -19,10 +19,28 @@ package org.apache.hadoop.yarn.security.client; +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Evolving; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenRenewer; import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier; +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager; +import org.apache.hadoop.yarn.api.ClientRMProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest; +import org.apache.hadoop.yarn.api.records.DelegationToken; +import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.util.BuilderUtils; +import org.apache.hadoop.yarn.util.Records; /** * Delegation Token Identifier that identifies the delegation tokens from the @@ -51,4 +69,100 @@ public RMDelegationTokenIdentifier(Text owner, Text renewer, Text realUser) { public Text getKind() { return KIND_NAME; } + + public static class Renewer extends TokenRenewer { + + @Override + public boolean handleKind(Text kind) { + return KIND_NAME.equals(kind); + } + + @Override + public boolean isManaged(Token token) throws IOException { + return true; + } + + private static + AbstractDelegationTokenSecretManager localSecretManager; + private static InetSocketAddress localServiceAddress; + + @Private + public static void setSecretManager( + AbstractDelegationTokenSecretManager secretManager, + InetSocketAddress serviceAddress) { + localSecretManager = secretManager; + localServiceAddress = serviceAddress; + } + + @SuppressWarnings("unchecked") + @Override + public long renew(Token token, Configuration conf) throws IOException, + InterruptedException { + final ClientRMProtocol rmClient = getRmClient(token, conf); + if (rmClient != null) { + try { + RenewDelegationTokenRequest request = + Records.newRecord(RenewDelegationTokenRequest.class); + request.setDelegationToken(convertToProtoToken(token)); + return rmClient.renewDelegationToken(request).getNextExpirationTime(); + } finally { + RPC.stopProxy(rmClient); + } + } else { + return localSecretManager.renewToken( + (Token)token, getRenewer(token)); + } + } + + @SuppressWarnings("unchecked") + @Override + public void cancel(Token token, Configuration conf) throws IOException, + InterruptedException { + final ClientRMProtocol rmClient = getRmClient(token, conf); + if (rmClient != null) { + try { + CancelDelegationTokenRequest request = + Records.newRecord(CancelDelegationTokenRequest.class); + request.setDelegationToken(convertToProtoToken(token)); + rmClient.cancelDelegationToken(request); + } finally { + RPC.stopProxy(rmClient); + } + } else { + localSecretManager.cancelToken( + (Token)token, getRenewer(token)); + } + } + + private static ClientRMProtocol getRmClient(Token token, + Configuration conf) { + InetSocketAddress addr = SecurityUtil.getTokenServiceAddr(token); + if (localSecretManager != null) { + // return null if it's our token + if (localServiceAddress.getAddress().isAnyLocalAddress()) { + if (NetUtils.isLocalAddress(addr.getAddress()) && + addr.getPort() == localServiceAddress.getPort()) { + return null; + } + } else if (addr.equals(localServiceAddress)) { + return null; + } + } + final YarnRPC rpc = YarnRPC.create(conf); + return (ClientRMProtocol)rpc.getProxy(ClientRMProtocol.class, addr, conf); + } + + // get renewer so we can always renew our own tokens + @SuppressWarnings("unchecked") + private static String getRenewer(Token token) throws IOException { + return ((Token)token).decodeIdentifier() + .getRenewer().toString(); + } + + private static DelegationToken convertToProtoToken(Token token) { + return BuilderUtils.newDelegationToken( + token.getIdentifier(), token.getKind().toString(), + token.getPassword(), token.getService().toString()); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer index 3380cb8b01..0e87a7c2d1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer @@ -13,3 +13,4 @@ # org.apache.hadoop.yarn.security.ApplicationTokenIdentifier$Renewer org.apache.hadoop.yarn.security.ContainerTokenIdentifier$Renewer +org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier$Renewer diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java index a464b3ae00..e0522a33fd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java @@ -157,6 +157,10 @@ public void start() { this.server.start(); clientBindAddress = conf.updateConnectAddr(YarnConfiguration.RM_ADDRESS, server.getListenerAddress()); + // enable RM to short-circuit token operations directly to itself + RMDelegationTokenIdentifier.Renewer.setSecretManager( + rmDTSecretManager, clientBindAddress); + super.start(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java index 3f78696474..5ee851b843 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java @@ -17,13 +17,12 @@ package org.apache.hadoop.yarn.server.resourcemanager; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; import java.io.IOException; import java.lang.reflect.UndeclaredThrowableException; +import java.net.InetAddress; import java.net.InetSocketAddress; import java.security.PrivilegedAction; import java.security.PrivilegedExceptionAction; @@ -34,9 +33,15 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; +import org.apache.hadoop.security.token.SecretManager; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.yarn.api.ClientRMProtocol; import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest; @@ -46,12 +51,14 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.RMDelegationTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.hadoop.yarn.util.ProtoUtils; import org.apache.hadoop.yarn.util.Records; +import org.junit.Before; import org.junit.Test; @@ -59,6 +66,10 @@ public class TestClientRMTokens { private static final Log LOG = LogFactory.getLog(TestClientRMTokens.class); + @Before + public void resetSecretManager() { + RMDelegationTokenIdentifier.Renewer.setSecretManager(null, null); + } @Test public void testDelegationToken() throws IOException, InterruptedException { @@ -200,7 +211,122 @@ public void testDelegationToken() throws IOException, InterruptedException { RPC.stopProxy(clientRMWithDT); } } + } + + @Test + public void testShortCircuitRenewCancel() + throws IOException, InterruptedException { + InetSocketAddress addr = + new InetSocketAddress(InetAddress.getLocalHost(), 123); + checkShortCircuitRenewCancel(addr, addr, true); + } + + @Test + public void testShortCircuitRenewCancelWildcardAddress() + throws IOException, InterruptedException { + InetSocketAddress rmAddr = new InetSocketAddress(123); + checkShortCircuitRenewCancel( + rmAddr, + new InetSocketAddress(InetAddress.getLocalHost(), rmAddr.getPort()), + true); + } + + @Test + public void testShortCircuitRenewCancelSameHostDifferentPort() + throws IOException, InterruptedException { + InetSocketAddress rmAddr = + new InetSocketAddress(InetAddress.getLocalHost(), 123); + checkShortCircuitRenewCancel( + rmAddr, + new InetSocketAddress(rmAddr.getAddress(), rmAddr.getPort()+1), + false); + } + + @Test + public void testShortCircuitRenewCancelDifferentHostSamePort() + throws IOException, InterruptedException { + InetSocketAddress rmAddr = + new InetSocketAddress(InetAddress.getLocalHost(), 123); + checkShortCircuitRenewCancel( + rmAddr, + new InetSocketAddress("1.1.1.1", rmAddr.getPort()), + false); + } + + @Test + public void testShortCircuitRenewCancelDifferentHostDifferentPort() + throws IOException, InterruptedException { + InetSocketAddress rmAddr = + new InetSocketAddress(InetAddress.getLocalHost(), 123); + checkShortCircuitRenewCancel( + rmAddr, + new InetSocketAddress("1.1.1.1", rmAddr.getPort()+1), + false); + } + + @SuppressWarnings("unchecked") + private void checkShortCircuitRenewCancel(InetSocketAddress rmAddr, + InetSocketAddress serviceAddr, + boolean shouldShortCircuit + ) throws IOException, InterruptedException { + Configuration conf = new Configuration(); + conf.setClass(YarnConfiguration.IPC_RPC_IMPL, + YarnBadRPC.class, YarnRPC.class); + RMDelegationTokenSecretManager secretManager = + mock(RMDelegationTokenSecretManager.class); + RMDelegationTokenIdentifier.Renewer.setSecretManager(secretManager, rmAddr); + + RMDelegationTokenIdentifier ident = new RMDelegationTokenIdentifier( + new Text("owner"), new Text("renewer"), null); + Token token = + new Token(ident, secretManager); + + SecurityUtil.setTokenService(token, serviceAddr); + if (shouldShortCircuit) { + token.renew(conf); + verify(secretManager).renewToken(eq(token), eq("renewer")); + reset(secretManager); + token.cancel(conf); + verify(secretManager).cancelToken(eq(token), eq("renewer")); + } else { + try { + token.renew(conf); + fail(); + } catch (RuntimeException e) { + assertEquals("getProxy", e.getMessage()); + } + verify(secretManager, never()).renewToken(any(Token.class), anyString()); + try { + token.cancel(conf); + fail(); + } catch (RuntimeException e) { + assertEquals("getProxy", e.getMessage()); + } + verify(secretManager, never()).cancelToken(any(Token.class), anyString()); + } + } + + @SuppressWarnings("rawtypes") + public static class YarnBadRPC extends YarnRPC { + @Override + public Object getProxy(Class protocol, InetSocketAddress addr, + Configuration conf) { + throw new RuntimeException("getProxy"); + } + + @Override + public void stopProxy(Object proxy, Configuration conf) { + throw new RuntimeException("stopProxy"); + } + + @Override + public Server getServer(Class protocol, Object instance, + InetSocketAddress addr, Configuration conf, + SecretManager secretManager, + int numHandlers, String portRangeConfig) { + throw new RuntimeException("getServer"); + } } // Get the delegation token directly as it is a little difficult to setup