diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 2e99d1a39f..a9da21a471 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -214,6 +214,9 @@ Release 2.0.4-beta - UNRELEASED fix failures in renewal of HistoryServer's delegations tokens. (Siddharth Seth via vinodkv) + MAPREDUCE-5088. MR Client gets an renewer token exception while Oozie is + submitting a job (Daryn Sharp via cos) + Release 2.0.3-alpha - 2013-02-06 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java index 70bcbc56aa..902907447b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java @@ -138,15 +138,6 @@ public class JobClient extends CLI { public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL } private TaskStatusFilter taskOutputFilter = TaskStatusFilter.FAILED; - /* notes that get delegation token was called. Again this is hack for oozie - * to make sure we add history server delegation tokens to the credentials - * for the job. Since the api only allows one delegation token to be returned, - * we have to add this hack. - */ - private boolean getDelegationTokenCalled = false; - /* do we need a HS delegation token for this client */ - static final String HS_DELEGATION_TOKEN_REQUIRED - = "mapreduce.history.server.delegationtoken.required"; static{ ConfigUtil.loadResources(); @@ -569,10 +560,6 @@ public RunningJob submitJob(final JobConf conf) throws FileNotFoundException, try { conf.setBooleanIfUnset("mapred.mapper.new-api", false); conf.setBooleanIfUnset("mapred.reducer.new-api", false); - if (getDelegationTokenCalled) { - conf.setBoolean(HS_DELEGATION_TOKEN_REQUIRED, getDelegationTokenCalled); - getDelegationTokenCalled = false; - } Job job = clientUgi.doAs(new PrivilegedExceptionAction () { @Override public Job run() throws IOException, ClassNotFoundException, @@ -1173,7 +1160,6 @@ public org.apache.hadoop.mapreduce.QueueAclsInfo[] run() */ public Token getDelegationToken(final Text renewer) throws IOException, InterruptedException { - getDelegationTokenCalled = true; return clientUgi.doAs(new PrivilegedExceptionAction>() { public Token run() throws IOException, diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java index 6a72917476..5903a4aaac 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java @@ -19,6 +19,7 @@ package org.apache.hadoop.mapred; import java.io.IOException; +import java.net.InetSocketAddress; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -87,6 +88,10 @@ public ClusterMetrics getClusterMetrics() throws IOException, return oldMetrics; } + InetSocketAddress getConnectAddress() { + return rmAddress; + } + @SuppressWarnings("rawtypes") public Token getDelegationToken(Text renewer) throws IOException, InterruptedException { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java index e8fd18a4c8..971492b0ea 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java @@ -60,6 +60,7 @@ import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils; import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.security.token.Token; @@ -81,6 +82,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.security.client.RMTokenSelector; import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ProtoUtils; @@ -90,7 +92,7 @@ /** * This class enables the current JobClient (0.22 hadoop) to run on YARN. */ -@SuppressWarnings({ "rawtypes", "unchecked" }) +@SuppressWarnings("unchecked") public class YARNRunner implements ClientProtocol { private static final Log LOG = LogFactory.getLog(YARNRunner.class); @@ -101,14 +103,6 @@ public class YARNRunner implements ClientProtocol { private Configuration conf; private final FileContext defaultFileContext; - /* usually is false unless the jobclient get delegation token is - * called. This is a hack wherein we do return a token from RM - * on getDelegationtoken but due to the restricted api on jobclient - * we just add a job history DT token when submitting a job. - */ - private static final boolean DEFAULT_HS_DELEGATION_TOKEN_REQUIRED = - false; - /** * Yarn runner incapsulates the client interface of * yarn @@ -185,6 +179,28 @@ public ClusterMetrics getClusterMetrics() throws IOException, return resMgrDelegate.getClusterMetrics(); } + @VisibleForTesting + void addHistoyToken(Credentials ts) throws IOException, InterruptedException { + /* check if we have a hsproxy, if not, no need */ + MRClientProtocol hsProxy = clientCache.getInitializedHSProxy(); + if (UserGroupInformation.isSecurityEnabled() && (hsProxy != null)) { + /* + * note that get delegation token was called. Again this is hack for oozie + * to make sure we add history server delegation tokens to the credentials + */ + RMTokenSelector tokenSelector = new RMTokenSelector(); + Text service = SecurityUtil.buildTokenService(resMgrDelegate + .getConnectAddress()); + if (tokenSelector.selectToken(service, ts.getAllTokens()) != null) { + Text hsService = SecurityUtil.buildTokenService(hsProxy + .getConnectAddress()); + if (ts.getToken(hsService) == null) { + ts.addToken(hsService, getDelegationTokenFromHS(hsProxy)); + } + } + } + } + @VisibleForTesting Token getDelegationTokenFromHS(MRClientProtocol hsProxy) throws IOException, InterruptedException { @@ -263,18 +279,8 @@ public long getTaskTrackerExpiryInterval() throws IOException, public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts) throws IOException, InterruptedException { - /* check if we have a hsproxy, if not, no need */ - MRClientProtocol hsProxy = clientCache.getInitializedHSProxy(); - if (hsProxy != null) { - // JobClient will set this flag if getDelegationToken is called, if so, get - // the delegation tokens for the HistoryServer also. - if (conf.getBoolean(JobClient.HS_DELEGATION_TOKEN_REQUIRED, - DEFAULT_HS_DELEGATION_TOKEN_REQUIRED)) { - Token hsDT = getDelegationTokenFromHS(hsProxy); - ts.addToken(hsDT.getService(), hsDT); - } - } - + addHistoyToken(ts); + // Upload only in security mode: TODO Path applicationTokensFile = new Path(jobSubmitDir, MRJobConfig.APPLICATION_TOKENS_FILE); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java index 5675742cfd..601268a7e7 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java @@ -20,8 +20,10 @@ import static org.mockito.Matchers.any; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -30,6 +32,7 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; +import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.security.PrivilegedExceptionAction; import java.util.List; @@ -39,28 +42,24 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.ClientCache; -import org.apache.hadoop.mapred.ClientServiceDelegate; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.Master; -import org.apache.hadoop.mapred.ResourceMgrDelegate; -import org.apache.hadoop.mapred.YARNRunner; +import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.JobPriority; import org.apache.hadoop.mapreduce.JobStatus.State; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol; +import org.apache.hadoop.mapreduce.v2.api.MRDelegationTokenIdentifier; import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest; import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenResponse; import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.ClientRMProtocol; -import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; -import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; @@ -69,21 +68,27 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.DelegationToken; import org.apache.hadoop.yarn.api.records.QueueInfo; -import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; +import org.apache.hadoop.yarn.util.BuilderUtils; +import org.apache.hadoop.yarn.util.Records; import org.apache.log4j.Appender; import org.apache.log4j.Layout; import org.apache.log4j.Logger; @@ -146,7 +151,7 @@ public ApplicationSubmissionContext answer(InvocationOnMock invocation) } - @Test + @Test(timeout=20000) public void testJobKill() throws Exception { clientDelegate = mock(ClientServiceDelegate.class); when(clientDelegate.getJobStatus(any(JobID.class))).thenReturn(new @@ -171,7 +176,7 @@ public ClientServiceDelegate answer(InvocationOnMock invocation) verify(clientDelegate).killJob(jobId); } - @Test + @Test(timeout=20000) public void testJobSubmissionFailure() throws Exception { when(resourceMgrDelegate.submitApplication(any(ApplicationSubmissionContext.class))). thenReturn(appId); @@ -193,7 +198,7 @@ public void testJobSubmissionFailure() throws Exception { } } - @Test + @Test(timeout=20000) public void testResourceMgrDelegate() throws Exception { /* we not want a mock of resource mgr delegate */ final ClientRMProtocol clientRMProtocol = mock(ClientRMProtocol.class); @@ -259,8 +264,88 @@ public synchronized void start() { delegate.getQueueAclsForCurrentUser(); verify(clientRMProtocol).getQueueUserAcls(any(GetQueueUserAclsInfoRequest.class)); } - - @Test + + @Test(timeout=20000) + public void testGetHSDelegationToken() throws Exception { + try { + Configuration conf = new Configuration(); + + // Setup mock service + InetSocketAddress mockRmAddress = new InetSocketAddress("localhost", 4444); + Text rmTokenSevice = SecurityUtil.buildTokenService(mockRmAddress); + + InetSocketAddress mockHsAddress = new InetSocketAddress("localhost", 9200); + Text hsTokenSevice = SecurityUtil.buildTokenService(mockHsAddress); + + // Setup mock rm token + RMDelegationTokenIdentifier tokenIdentifier = new RMDelegationTokenIdentifier( + new Text("owner"), new Text("renewer"), new Text("real")); + Token token = new Token( + new byte[0], new byte[0], tokenIdentifier.getKind(), rmTokenSevice); + token.setKind(RMDelegationTokenIdentifier.KIND_NAME); + + // Setup mock history token + DelegationToken historyToken = BuilderUtils.newDelegationToken( + new byte[0], MRDelegationTokenIdentifier.KIND_NAME.toString(), + new byte[0], hsTokenSevice.toString()); + GetDelegationTokenResponse getDtResponse = Records + .newRecord(GetDelegationTokenResponse.class); + getDtResponse.setDelegationToken(historyToken); + + // mock services + MRClientProtocol mockHsProxy = mock(MRClientProtocol.class); + doReturn(mockHsAddress).when(mockHsProxy).getConnectAddress(); + doReturn(getDtResponse).when(mockHsProxy).getDelegationToken( + any(GetDelegationTokenRequest.class)); + + ResourceMgrDelegate rmDelegate = mock(ResourceMgrDelegate.class); + doReturn(mockRmAddress).when(rmDelegate).getConnectAddress(); + + ClientCache clientCache = mock(ClientCache.class); + doReturn(mockHsProxy).when(clientCache).getInitializedHSProxy(); + + Credentials creds = new Credentials(); + + YARNRunner yarnRunner = new YARNRunner(conf, rmDelegate, clientCache); + + // No HS token if no RM token + yarnRunner.addHistoyToken(creds); + verify(mockHsProxy, times(0)).getDelegationToken( + any(GetDelegationTokenRequest.class)); + + // No HS token if RM token, but secirity disabled. + creds.addToken(new Text("rmdt"), token); + yarnRunner.addHistoyToken(creds); + verify(mockHsProxy, times(0)).getDelegationToken( + any(GetDelegationTokenRequest.class)); + + conf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, + "kerberos"); + UserGroupInformation.setConfiguration(conf); + creds = new Credentials(); + + // No HS token if no RM token, security enabled + yarnRunner.addHistoyToken(creds); + verify(mockHsProxy, times(0)).getDelegationToken( + any(GetDelegationTokenRequest.class)); + + // HS token if RM token present, security enabled + creds.addToken(new Text("rmdt"), token); + yarnRunner.addHistoyToken(creds); + verify(mockHsProxy, times(1)).getDelegationToken( + any(GetDelegationTokenRequest.class)); + + // No additional call to get HS token if RM and HS token present + yarnRunner.addHistoyToken(creds); + verify(mockHsProxy, times(1)).getDelegationToken( + any(GetDelegationTokenRequest.class)); + } finally { + // Back to defaults. + UserGroupInformation.setConfiguration(new Configuration()); + } + } + + @Test(timeout=20000) public void testHistoryServerToken() throws Exception { //Set the master principal in the config conf.set(YarnConfiguration.RM_PRINCIPAL,"foo@LOCAL"); @@ -303,7 +388,7 @@ public Void run() throws Exception { }); } - @Test + @Test(timeout=20000) public void testAMAdminCommandOpts() throws Exception { JobConf jobConf = new JobConf(); @@ -366,7 +451,7 @@ public void testAMAdminCommandOpts() throws Exception { assertTrue("AM admin command opts is after user command opts.", adminIndex < userIndex); } } - @Test + @Test(timeout=20000) public void testWarnCommandOpts() throws Exception { Logger logger = Logger.getLogger(YARNRunner.class);