diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index f77c80c644..77aab5002a 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -1452,6 +1452,8 @@ Release 0.23.0 - Unreleased MAPREDUCE-3095. fairscheduler ivy including wrong version for hdfs. (John George via mahadev) + MAPREDUCE-3054. Unable to kill submitted jobs. (mahadev) + Release 0.22.0 - Unreleased INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientCache.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientCache.java index 80c8d91a1b..20c6ce7c00 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientCache.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientCache.java @@ -1,20 +1,20 @@ /** -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.hadoop.mapred; @@ -42,29 +42,29 @@ public class ClientCache { private final Configuration conf; private final ResourceMgrDelegate rm; - + private static final Log LOG = LogFactory.getLog(ClientCache.class); private Map cache = - new HashMap(); - + new HashMap(); + private MRClientProtocol hsProxy; - ClientCache(Configuration conf, ResourceMgrDelegate rm) { + public ClientCache(Configuration conf, ResourceMgrDelegate rm) { this.conf = conf; this.rm = rm; } //TODO: evict from the cache on some threshold - synchronized ClientServiceDelegate getClient(JobID jobId) { - if (hsProxy == null) { + public synchronized ClientServiceDelegate getClient(JobID jobId) { + if (hsProxy == null) { try { - hsProxy = instantiateHistoryProxy(); - } catch (IOException e) { - LOG.warn("Could not connect to History server.", e); - throw new YarnException("Could not connect to History server.", e); - } - } + hsProxy = instantiateHistoryProxy(); + } catch (IOException e) { + LOG.warn("Could not connect to History server.", e); + throw new YarnException("Could not connect to History server.", e); + } + } ClientServiceDelegate client = cache.get(jobId); if (client == null) { client = new ClientServiceDelegate(conf, rm, jobId, hsProxy); @@ -74,7 +74,7 @@ synchronized ClientServiceDelegate getClient(JobID jobId) { } private MRClientProtocol instantiateHistoryProxy() - throws IOException { + throws IOException { final String serviceAddr = conf.get(JHAdminConfig.MR_HISTORY_ADDRESS); if (StringUtils.isEmpty(serviceAddr)) { return null; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java index 429d350c5a..341e17e951 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java @@ -70,7 +70,7 @@ import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier; import org.apache.hadoop.yarn.security.SchedulerSecurityInfo; -class ClientServiceDelegate { +public class ClientServiceDelegate { private static final Log LOG = LogFactory.getLog(ClientServiceDelegate.class); // Caches for per-user NotRunningJobs @@ -87,7 +87,7 @@ class ClientServiceDelegate { private RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); private static String UNKNOWN_USER = "Unknown User"; - ClientServiceDelegate(Configuration conf, ResourceMgrDelegate rm, + public ClientServiceDelegate(Configuration conf, ResourceMgrDelegate rm, JobID jobId, MRClientProtocol historyServerProxy) { this.conf = new Configuration(conf); // Cloning for modifying. // For faster redirects from AM to HS. @@ -279,7 +279,7 @@ private synchronized Object invoke(String method, Class argClass, } } - org.apache.hadoop.mapreduce.Counters getJobCounters(JobID arg0) throws IOException, + public org.apache.hadoop.mapreduce.Counters getJobCounters(JobID arg0) throws IOException, InterruptedException { org.apache.hadoop.mapreduce.v2.api.records.JobId jobID = TypeConverter.toYarn(arg0); GetCountersRequest request = recordFactory.newRecordInstance(GetCountersRequest.class); @@ -290,7 +290,7 @@ org.apache.hadoop.mapreduce.Counters getJobCounters(JobID arg0) throws IOExcepti } - TaskCompletionEvent[] getTaskCompletionEvents(JobID arg0, int arg1, int arg2) + public TaskCompletionEvent[] getTaskCompletionEvents(JobID arg0, int arg1, int arg2) throws IOException, InterruptedException { org.apache.hadoop.mapreduce.v2.api.records.JobId jobID = TypeConverter .toYarn(arg0); @@ -308,7 +308,7 @@ TaskCompletionEvent[] getTaskCompletionEvents(JobID arg0, int arg1, int arg2) .toArray(new org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent[0])); } - String[] getTaskDiagnostics(org.apache.hadoop.mapreduce.TaskAttemptID arg0) + public String[] getTaskDiagnostics(org.apache.hadoop.mapreduce.TaskAttemptID arg0) throws IOException, InterruptedException { org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = TypeConverter @@ -326,7 +326,7 @@ String[] getTaskDiagnostics(org.apache.hadoop.mapreduce.TaskAttemptID arg0) return result; } - JobStatus getJobStatus(JobID oldJobID) throws YarnRemoteException { + public JobStatus getJobStatus(JobID oldJobID) throws YarnRemoteException { org.apache.hadoop.mapreduce.v2.api.records.JobId jobId = TypeConverter.toYarn(oldJobID); GetJobReportRequest request = @@ -339,7 +339,7 @@ JobStatus getJobStatus(JobID oldJobID) throws YarnRemoteException { return TypeConverter.fromYarn(report, jobFile); } - org.apache.hadoop.mapreduce.TaskReport[] getTaskReports(JobID oldJobID, TaskType taskType) + public org.apache.hadoop.mapreduce.TaskReport[] getTaskReports(JobID oldJobID, TaskType taskType) throws YarnRemoteException, YarnRemoteException { org.apache.hadoop.mapreduce.v2.api.records.JobId jobId = TypeConverter.toYarn(oldJobID); @@ -356,7 +356,7 @@ org.apache.hadoop.mapreduce.TaskReport[] getTaskReports(JobID oldJobID, TaskType (taskReports).toArray(new org.apache.hadoop.mapreduce.TaskReport[0]); } - boolean killTask(TaskAttemptID taskAttemptID, boolean fail) + public boolean killTask(TaskAttemptID taskAttemptID, boolean fail) throws YarnRemoteException { org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = TypeConverter.toYarn(taskAttemptID); @@ -372,7 +372,7 @@ boolean killTask(TaskAttemptID taskAttemptID, boolean fail) return true; } - boolean killJob(JobID oldJobID) + public boolean killJob(JobID oldJobID) throws YarnRemoteException { org.apache.hadoop.mapreduce.v2.api.records.JobId jobId = TypeConverter.toYarn(oldJobID); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java index be5b862100..65e51735dd 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java @@ -44,7 +44,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.ClientRMProtocol; -import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; @@ -79,6 +79,10 @@ public class ResourceMgrDelegate { private ApplicationId applicationId; private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); + /** + * Delegate responsible for communicating with the Resource Manager's {@link ClientRMProtocol}. + * @param conf the configuration object. + */ public ResourceMgrDelegate(YarnConfiguration conf) { this.conf = conf; YarnRPC rpc = YarnRPC.create(this.conf); @@ -97,6 +101,16 @@ public ResourceMgrDelegate(YarnConfiguration conf) { LOG.info("Connected to ResourceManager at " + rmAddress); } + /** + * Used for injecting applicationsManager, mostly for testing. + * @param conf the configuration object + * @param applicationsManager the handle to talk the resource managers {@link ClientRMProtocol}. + */ + public ResourceMgrDelegate(YarnConfiguration conf, ClientRMProtocol applicationsManager) { + this.conf = conf; + this.applicationsManager = applicationsManager; + } + public void cancelDelegationToken(Token arg0) throws IOException, InterruptedException { return; @@ -294,9 +308,9 @@ public ApplicationId submitApplication(ApplicationSubmissionContext appContext) } public void killApplication(ApplicationId applicationId) throws IOException { - FinishApplicationRequest request = recordFactory.newRecordInstance(FinishApplicationRequest.class); + KillApplicationRequest request = recordFactory.newRecordInstance(KillApplicationRequest.class); request.setApplicationId(applicationId); - applicationsManager.finishApplication(request); + applicationsManager.forceKillApplication(request); LOG.info("Killing application " + applicationId); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java index 20bd976b8d..a11968a16f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java @@ -105,10 +105,22 @@ public YARNRunner(Configuration conf) { * @param resMgrDelegate the resourcemanager client handle. */ public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate) { + this(conf, resMgrDelegate, new ClientCache(conf, resMgrDelegate)); + } + + /** + * Similar to {@link YARNRunner#YARNRunner(Configuration, ResourceMgrDelegate)} + * but allowing injecting {@link ClientCache}. Enable mocking and testing. + * @param conf the configuration object + * @param resMgrDelegate the resource manager delegate + * @param clientCache the client cache object. + */ + public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate, + ClientCache clientCache) { this.conf = conf; try { this.resMgrDelegate = resMgrDelegate; - this.clientCache = new ClientCache(this.conf, resMgrDelegate); + this.clientCache = clientCache; this.defaultFileContext = FileContext.getFileContext(this.conf); } catch (UnsupportedFileSystemException ufe) { throw new RuntimeException("Error in instantiating YarnClient", ufe); @@ -429,9 +441,35 @@ public TaskReport[] getTaskReports(JobID jobID, TaskType taskType) @Override public void killJob(JobID arg0) throws IOException, InterruptedException { - if (!clientCache.getClient(arg0).killJob(arg0)) { - resMgrDelegate.killApplication(TypeConverter.toYarn(arg0).getAppId()); - } + /* check if the status is not running, if not send kill to RM */ + JobStatus status = clientCache.getClient(arg0).getJobStatus(arg0); + if (status.getState() != JobStatus.State.RUNNING) { + resMgrDelegate.killApplication(TypeConverter.toYarn(arg0).getAppId()); + return; + } + + try { + /* send a kill to the AM */ + clientCache.getClient(arg0).killJob(arg0); + long currentTimeMillis = System.currentTimeMillis(); + long timeKillIssued = currentTimeMillis; + while ((currentTimeMillis < timeKillIssued + 10000L) && (status.getState() + != JobStatus.State.KILLED)) { + try { + Thread.sleep(1000L); + } catch(InterruptedException ie) { + /** interrupted, just break */ + break; + } + currentTimeMillis = System.currentTimeMillis(); + status = clientCache.getClient(arg0).getJobStatus(arg0); + } + } catch(IOException io) { + LOG.debug("Error when checking for application status", io); + } + if (status.getState() != JobStatus.State.KILLED) { + resMgrDelegate.killApplication(TypeConverter.toYarn(arg0).getAppId()); + } } @Override diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java index 12b1c2cc9c..54b0422648 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java @@ -68,8 +68,8 @@ import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.api.ClientRMProtocol; -import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationRequest; -import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; @@ -288,9 +288,9 @@ public SubmitApplicationResponse submitApplication( } @Override - public FinishApplicationResponse finishApplication( - FinishApplicationRequest request) throws YarnRemoteException { - return null; + public KillApplicationResponse forceKillApplication( + KillApplicationRequest request) throws YarnRemoteException { + return recordFactory.newRecordInstance(KillApplicationResponse.class); } @Override @@ -451,7 +451,7 @@ public GetTaskAttemptReportResponse getTaskAttemptReport( @Override public KillJobResponse killJob(KillJobRequest request) throws YarnRemoteException { - return null; + return recordFactory.newRecordInstance(KillJobResponse.class); } @Override diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestYARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestYARNRunner.java index bc0dfe5fa4..8878851d5c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestYARNRunner.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestYARNRunner.java @@ -22,6 +22,7 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.io.File; @@ -36,15 +37,38 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.ClientCache; +import org.apache.hadoop.mapred.ClientServiceDelegate; +import org.apache.hadoop.mapred.JobStatus; import org.apache.hadoop.mapred.ResourceMgrDelegate; import org.apache.hadoop.mapred.YARNRunner; import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.JobPriority; +import org.apache.hadoop.mapreduce.JobStatus.State; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.yarn.api.ClientRMProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationIdRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationIdResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationState; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.QueueInfo; +import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; @@ -54,9 +78,8 @@ import org.mockito.stubbing.Answer; /** - * Test if the jobclient shows enough diagnostics - * on a job failure. - * + * Test YarnRunner and make sure the client side plugin works + * fine */ public class TestYARNRunner extends TestCase { private static final Log LOG = LogFactory.getLog(TestYARNRunner.class); @@ -65,18 +88,22 @@ public class TestYARNRunner extends TestCase { private YARNRunner yarnRunner; private ResourceMgrDelegate resourceMgrDelegate; private YarnConfiguration conf; + private ClientCache clientCache; private ApplicationId appId; private JobID jobId; private File testWorkDir = new File("target", TestYARNRunner.class.getName()); private ApplicationSubmissionContext submissionContext; + private ClientServiceDelegate clientDelegate; private static final String failString = "Rejected job"; @Before public void setUp() throws Exception { resourceMgrDelegate = mock(ResourceMgrDelegate.class); conf = new YarnConfiguration(); - yarnRunner = new YARNRunner(conf, resourceMgrDelegate); + clientCache = new ClientCache(conf, resourceMgrDelegate); + clientCache = spy(clientCache); + yarnRunner = new YARNRunner(conf, resourceMgrDelegate, clientCache); yarnRunner = spy(yarnRunner); submissionContext = mock(ApplicationSubmissionContext.class); doAnswer( @@ -101,6 +128,31 @@ public ApplicationSubmissionContext answer(InvocationOnMock invocation) } + @Test + public void testJobKill() throws Exception { + clientDelegate = mock(ClientServiceDelegate.class); + when(clientDelegate.getJobStatus(any(JobID.class))).thenReturn(new + org.apache.hadoop.mapreduce.JobStatus(jobId, 0f, 0f, 0f, 0f, + State.PREP, JobPriority.HIGH, "tmp", "tmp", "tmp", "tmp")); + when(clientDelegate.killJob(any(JobID.class))).thenReturn(true); + doAnswer( + new Answer() { + @Override + public ClientServiceDelegate answer(InvocationOnMock invocation) + throws Throwable { + return clientDelegate; + } + } + ).when(clientCache).getClient(any(JobID.class)); + yarnRunner.killJob(jobId); + verify(resourceMgrDelegate).killApplication(appId); + when(clientDelegate.getJobStatus(any(JobID.class))).thenReturn(new + org.apache.hadoop.mapreduce.JobStatus(jobId, 0f, 0f, 0f, 0f, + State.RUNNING, JobPriority.HIGH, "tmp", "tmp", "tmp", "tmp")); + yarnRunner.killJob(jobId); + verify(clientDelegate).killJob(jobId); + } + @Test public void testJobSubmissionFailure() throws Exception { when(resourceMgrDelegate.submitApplication(any(ApplicationSubmissionContext.class))). @@ -122,4 +174,66 @@ public void testJobSubmissionFailure() throws Exception { assertTrue(io.getLocalizedMessage().contains(failString)); } } + + @Test + public void testResourceMgrDelegate() throws Exception { + /* we not want a mock of resourcemgr deleagte */ + ClientRMProtocol clientRMProtocol = mock(ClientRMProtocol.class); + ResourceMgrDelegate delegate = new ResourceMgrDelegate(conf, clientRMProtocol); + /* make sure kill calls finish application master */ + when(clientRMProtocol.forceKillApplication(any(KillApplicationRequest.class))) + .thenReturn(null); + delegate.killApplication(appId); + verify(clientRMProtocol).forceKillApplication(any(KillApplicationRequest.class)); + + /* make sure getalljobs calls get all applications */ + when(clientRMProtocol.getAllApplications(any(GetAllApplicationsRequest.class))). + thenReturn(recordFactory.newRecordInstance(GetAllApplicationsResponse.class)); + delegate.getAllJobs(); + verify(clientRMProtocol).getAllApplications(any(GetAllApplicationsRequest.class)); + + /* make sure getapplication report is called */ + when(clientRMProtocol.getApplicationReport(any(GetApplicationReportRequest.class))) + .thenReturn(recordFactory.newRecordInstance(GetApplicationReportResponse.class)); + delegate.getApplicationReport(appId); + verify(clientRMProtocol).getApplicationReport(any(GetApplicationReportRequest.class)); + + /* make sure metrics is called */ + GetClusterMetricsResponse clusterMetricsResponse = recordFactory.newRecordInstance + (GetClusterMetricsResponse.class); + clusterMetricsResponse.setClusterMetrics(recordFactory.newRecordInstance( + YarnClusterMetrics.class)); + when(clientRMProtocol.getClusterMetrics(any(GetClusterMetricsRequest.class))) + .thenReturn(clusterMetricsResponse); + delegate.getClusterMetrics(); + verify(clientRMProtocol).getClusterMetrics(any(GetClusterMetricsRequest.class)); + + when(clientRMProtocol.getClusterNodes(any(GetClusterNodesRequest.class))). + thenReturn(recordFactory.newRecordInstance(GetClusterNodesResponse.class)); + delegate.getActiveTrackers(); + verify(clientRMProtocol).getClusterNodes(any(GetClusterNodesRequest.class)); + + GetNewApplicationIdResponse newAppIdResponse = recordFactory.newRecordInstance( + GetNewApplicationIdResponse.class); + newAppIdResponse.setApplicationId(appId); + when(clientRMProtocol.getNewApplicationId(any(GetNewApplicationIdRequest.class))). + thenReturn(newAppIdResponse); + delegate.getNewJobID(); + verify(clientRMProtocol).getNewApplicationId(any(GetNewApplicationIdRequest.class)); + + GetQueueInfoResponse queueInfoResponse = recordFactory.newRecordInstance( + GetQueueInfoResponse.class); + queueInfoResponse.setQueueInfo(recordFactory.newRecordInstance(QueueInfo.class)); + when(clientRMProtocol.getQueueInfo(any(GetQueueInfoRequest.class))). + thenReturn(queueInfoResponse); + delegate.getQueues(); + verify(clientRMProtocol).getQueueInfo(any(GetQueueInfoRequest.class)); + + GetQueueUserAclsInfoResponse aclResponse = recordFactory.newRecordInstance( + GetQueueUserAclsInfoResponse.class); + when(clientRMProtocol.getQueueUserAcls(any(GetQueueUserAclsInfoRequest.class))) + .thenReturn(aclResponse); + delegate.getQueueAclsForCurrentUser(); + verify(clientRMProtocol).getQueueUserAcls(any(GetQueueUserAclsInfoRequest.class)); + } } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ClientRMProtocol.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ClientRMProtocol.java index db4c4790cf..f16cb6da4a 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ClientRMProtocol.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ClientRMProtocol.java @@ -21,8 +21,8 @@ import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Stable; -import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationRequest; -import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; @@ -102,7 +102,7 @@ public SubmitApplicationResponse submitApplication( *

The interface used by clients to request the * ResourceManager to abort submitted application.

* - *

The client, via {@link FinishApplicationRequest} provides the + *

The client, via {@link KillApplicationRequest} provides the * {@link ApplicationId} of the application to be aborted.

* *

In secure mode,the ResourceManager verifies access to the @@ -117,8 +117,8 @@ public SubmitApplicationResponse submitApplication( * @throws YarnRemoteException * @see #getQueueUserAcls(GetQueueUserAclsInfoRequest) */ - public FinishApplicationResponse finishApplication( - FinishApplicationRequest request) + public KillApplicationResponse forceKillApplication( + KillApplicationRequest request) throws YarnRemoteException; /** diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/FinishApplicationRequest.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/KillApplicationRequest.java similarity index 94% rename from hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/FinishApplicationRequest.java rename to hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/KillApplicationRequest.java index 023ee3c4ac..c033e64bb2 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/FinishApplicationRequest.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/KillApplicationRequest.java @@ -32,11 +32,11 @@ *

The request includes the {@link ApplicationId} of the application to be * aborted.

* - * @see ClientRMProtocol#finishApplication(FinishApplicationRequest) + * @see ClientRMProtocol#forceKillApplication(KillApplicationRequest) */ @Public @Stable -public interface FinishApplicationRequest { +public interface KillApplicationRequest { /** * Get the ApplicationId of the application to be aborted. * @return ApplicationId of the application to be aborted diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/FinishApplicationResponse.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/KillApplicationResponse.java similarity index 91% rename from hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/FinishApplicationResponse.java rename to hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/KillApplicationResponse.java index cd0c728e53..2a8d0f06d2 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/FinishApplicationResponse.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/KillApplicationResponse.java @@ -28,10 +28,10 @@ * *

Currently it's empty.

* - * @see ClientRMProtocol#finishApplication(FinishApplicationRequest) + * @see ClientRMProtocol#forceKillApplication(KillApplicationRequest) */ @Public @Stable -public interface FinishApplicationResponse { +public interface KillApplicationResponse { } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/FinishApplicationRequestPBImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/KillApplicationRequestPBImpl.java similarity index 74% rename from hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/FinishApplicationRequestPBImpl.java rename to hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/KillApplicationRequestPBImpl.java index 044382bdde..e2761a090b 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/FinishApplicationRequestPBImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/KillApplicationRequestPBImpl.java @@ -19,34 +19,34 @@ package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; -import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ProtoBase; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; -import org.apache.hadoop.yarn.proto.YarnServiceProtos.FinishApplicationRequestProto; -import org.apache.hadoop.yarn.proto.YarnServiceProtos.FinishApplicationRequestProtoOrBuilder; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.KillApplicationRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.KillApplicationRequestProtoOrBuilder; -public class FinishApplicationRequestPBImpl extends ProtoBase implements FinishApplicationRequest { - FinishApplicationRequestProto proto = FinishApplicationRequestProto.getDefaultInstance(); - FinishApplicationRequestProto.Builder builder = null; +public class KillApplicationRequestPBImpl extends ProtoBase implements KillApplicationRequest { + KillApplicationRequestProto proto = KillApplicationRequestProto.getDefaultInstance(); + KillApplicationRequestProto.Builder builder = null; boolean viaProto = false; private ApplicationId applicationId = null; - public FinishApplicationRequestPBImpl() { - builder = FinishApplicationRequestProto.newBuilder(); + public KillApplicationRequestPBImpl() { + builder = KillApplicationRequestProto.newBuilder(); } - public FinishApplicationRequestPBImpl(FinishApplicationRequestProto proto) { + public KillApplicationRequestPBImpl(KillApplicationRequestProto proto) { this.proto = proto; viaProto = true; } - public FinishApplicationRequestProto getProto() { + public KillApplicationRequestProto getProto() { mergeLocalToProto(); proto = viaProto ? proto : builder.build(); viaProto = true; @@ -69,7 +69,7 @@ private void mergeLocalToProto() { private void maybeInitBuilder() { if (viaProto || builder == null) { - builder = FinishApplicationRequestProto.newBuilder(proto); + builder = KillApplicationRequestProto.newBuilder(proto); } viaProto = false; } @@ -77,7 +77,7 @@ private void maybeInitBuilder() { @Override public ApplicationId getApplicationId() { - FinishApplicationRequestProtoOrBuilder p = viaProto ? proto : builder; + KillApplicationRequestProtoOrBuilder p = viaProto ? proto : builder; if (this.applicationId != null) { return this.applicationId; } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/FinishApplicationResponsePBImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/KillApplicationResponsePBImpl.java similarity index 62% rename from hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/FinishApplicationResponsePBImpl.java rename to hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/KillApplicationResponsePBImpl.java index b8ad6dd7ea..61c42fd20d 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/FinishApplicationResponsePBImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/KillApplicationResponsePBImpl.java @@ -19,27 +19,27 @@ package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; -import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; import org.apache.hadoop.yarn.api.records.ProtoBase; -import org.apache.hadoop.yarn.proto.YarnServiceProtos.FinishApplicationResponseProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.KillApplicationResponseProto; -public class FinishApplicationResponsePBImpl extends ProtoBase implements FinishApplicationResponse { - FinishApplicationResponseProto proto = FinishApplicationResponseProto.getDefaultInstance(); - FinishApplicationResponseProto.Builder builder = null; +public class KillApplicationResponsePBImpl extends ProtoBase implements KillApplicationResponse { + KillApplicationResponseProto proto = KillApplicationResponseProto.getDefaultInstance(); + KillApplicationResponseProto.Builder builder = null; boolean viaProto = false; - public FinishApplicationResponsePBImpl() { - builder = FinishApplicationResponseProto.newBuilder(); + public KillApplicationResponsePBImpl() { + builder = KillApplicationResponseProto.newBuilder(); } - public FinishApplicationResponsePBImpl(FinishApplicationResponseProto proto) { + public KillApplicationResponsePBImpl(KillApplicationResponseProto proto) { this.proto = proto; viaProto = true; } - public FinishApplicationResponseProto getProto() { + public KillApplicationResponseProto getProto() { proto = viaProto ? proto : builder.build(); viaProto = true; return proto; @@ -47,7 +47,7 @@ public FinishApplicationResponseProto getProto() { private void maybeInitBuilder() { if (viaProto || builder == null) { - builder = FinishApplicationResponseProto.newBuilder(proto); + builder = KillApplicationResponseProto.newBuilder(proto); } viaProto = false; } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/client_RM_protocol.proto b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/client_RM_protocol.proto index cfb14ff351..760b75c021 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/client_RM_protocol.proto +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/client_RM_protocol.proto @@ -27,7 +27,7 @@ service ClientRMProtocolService { rpc getNewApplicationId (GetNewApplicationIdRequestProto) returns (GetNewApplicationIdResponseProto); rpc getApplicationReport (GetApplicationReportRequestProto) returns (GetApplicationReportResponseProto); rpc submitApplication (SubmitApplicationRequestProto) returns (SubmitApplicationResponseProto); - rpc finishApplication (FinishApplicationRequestProto) returns (FinishApplicationResponseProto); + rpc forceKillApplication (KillApplicationRequestProto) returns (KillApplicationResponseProto); rpc getClusterMetrics (GetClusterMetricsRequestProto) returns (GetClusterMetricsResponseProto); rpc getAllApplications (GetAllApplicationsRequestProto) returns (GetAllApplicationsResponseProto); rpc getClusterNodes (GetClusterNodesRequestProto) returns (GetClusterNodesResponseProto); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index 753c6b8c9a..84003ad439 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -88,11 +88,11 @@ message SubmitApplicationRequestProto { message SubmitApplicationResponseProto { } -message FinishApplicationRequestProto { +message KillApplicationRequestProto { optional ApplicationIdProto application_id = 1; } -message FinishApplicationResponseProto { +message KillApplicationResponseProto { } message GetClusterMetricsRequestProto { diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ClientRMProtocolPBClientImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ClientRMProtocolPBClientImpl.java index 8972c656d9..b06c2caa35 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ClientRMProtocolPBClientImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ClientRMProtocolPBClientImpl.java @@ -25,8 +25,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.yarn.api.ClientRMProtocol; -import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationRequest; -import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; @@ -41,10 +39,10 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; -import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.FinishApplicationRequestPBImpl; -import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.FinishApplicationResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetAllApplicationsRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetAllApplicationsResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetApplicationReportRequestPBImpl; @@ -59,21 +57,22 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetQueueInfoResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetQueueUserAclsInfoRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetQueueUserAclsInfoResponsePBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.KillApplicationRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.KillApplicationResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationResponsePBImpl; import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.ipc.ProtoOverHadoopRpcEngine; import org.apache.hadoop.yarn.proto.ClientRMProtocol.ClientRMProtocolService; -import org.apache.hadoop.yarn.proto.YarnServiceProtos.FinishApplicationRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetAllApplicationsRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetApplicationReportRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetClusterMetricsRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetClusterNodesRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetNewApplicationIdRequestProto; -import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetQueueUserAclsInfoRequestProto; -import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetQueueInfoRequestProto; -import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetQueueInfoResponseProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetQueueUserAclsInfoRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.KillApplicationRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationRequestProto; import com.google.protobuf.ServiceException; @@ -88,11 +87,11 @@ public ClientRMProtocolPBClientImpl(long clientVersion, InetSocketAddress addr, } @Override - public FinishApplicationResponse finishApplication( - FinishApplicationRequest request) throws YarnRemoteException { - FinishApplicationRequestProto requestProto = ((FinishApplicationRequestPBImpl)request).getProto(); + public KillApplicationResponse forceKillApplication( + KillApplicationRequest request) throws YarnRemoteException { + KillApplicationRequestProto requestProto = ((KillApplicationRequestPBImpl)request).getProto(); try { - return new FinishApplicationResponsePBImpl(proxy.finishApplication(null, requestProto)); + return new KillApplicationResponsePBImpl(proxy.forceKillApplication(null, requestProto)); } catch (ServiceException e) { if (e.getCause() instanceof YarnRemoteException) { throw (YarnRemoteException)e.getCause(); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ClientRMProtocolPBServiceImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ClientRMProtocolPBServiceImpl.java index 35e4be5398..242e9624a1 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ClientRMProtocolPBServiceImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ClientRMProtocolPBServiceImpl.java @@ -19,7 +19,6 @@ package org.apache.hadoop.yarn.api.impl.pb.service; import org.apache.hadoop.yarn.api.ClientRMProtocol; -import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse; @@ -27,9 +26,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationIdResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; -import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.FinishApplicationRequestPBImpl; -import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.FinishApplicationResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetAllApplicationsRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetAllApplicationsResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetApplicationReportRequestPBImpl; @@ -44,12 +42,12 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetQueueInfoResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetQueueUserAclsInfoRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetQueueUserAclsInfoResponsePBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.KillApplicationRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.KillApplicationResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationResponsePBImpl; import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.proto.ClientRMProtocol.ClientRMProtocolService.BlockingInterface; -import org.apache.hadoop.yarn.proto.YarnServiceProtos.FinishApplicationRequestProto; -import org.apache.hadoop.yarn.proto.YarnServiceProtos.FinishApplicationResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetAllApplicationsRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetAllApplicationsResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetApplicationReportRequestProto; @@ -64,6 +62,8 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetQueueInfoResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetQueueUserAclsInfoRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetQueueUserAclsInfoResponseProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.KillApplicationRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.KillApplicationResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationResponseProto; @@ -79,12 +79,12 @@ public ClientRMProtocolPBServiceImpl(ClientRMProtocol impl) { } @Override - public FinishApplicationResponseProto finishApplication(RpcController arg0, - FinishApplicationRequestProto proto) throws ServiceException { - FinishApplicationRequestPBImpl request = new FinishApplicationRequestPBImpl(proto); + public KillApplicationResponseProto forceKillApplication(RpcController arg0, + KillApplicationRequestProto proto) throws ServiceException { + KillApplicationRequestPBImpl request = new KillApplicationRequestPBImpl(proto); try { - FinishApplicationResponse response = real.finishApplication(request); - return ((FinishApplicationResponsePBImpl)response).getProto(); + KillApplicationResponse response = real.forceKillApplication(request); + return ((KillApplicationResponsePBImpl)response).getProto(); } catch (YarnRemoteException e) { throw new ServiceException(e); } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java index a31bef8af9..8e29b33ff8 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java @@ -36,8 +36,8 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.yarn.api.ClientRMProtocol; -import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationRequest; -import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; @@ -228,8 +228,8 @@ public SubmitApplicationResponse submitApplication( @SuppressWarnings("unchecked") @Override - public FinishApplicationResponse finishApplication( - FinishApplicationRequest request) throws YarnRemoteException { + public KillApplicationResponse forceKillApplication( + KillApplicationRequest request) throws YarnRemoteException { ApplicationId applicationId = request.getApplicationId(); @@ -262,8 +262,8 @@ public FinishApplicationResponse finishApplication( RMAuditLogger.logSuccess(callerUGI.getShortUserName(), AuditConstants.KILL_APP_REQUEST, "ClientRMService" , applicationId); - FinishApplicationResponse response = recordFactory - .newRecordInstance(FinishApplicationResponse.class); + KillApplicationResponse response = recordFactory + .newRecordInstance(KillApplicationResponse.class); return response; } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 838dfc0b08..94d04a8d12 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent; import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType; @@ -86,7 +87,8 @@ public class RMAppImpl implements RMApp { private long startTime; private long finishTime; private RMAppAttempt currentAttempt; - + @SuppressWarnings("rawtypes") + private EventHandler handler; private static final FinalTransition FINAL_TRANSITION = new FinalTransition(); private static final StateMachineFactory(RMAppState.NEW) - // TODO - ATTEMPT_KILLED not sent right now but should handle if - // attempt starts sending - // Transitions from NEW state .addTransition(RMAppState.NEW, RMAppState.SUBMITTED, RMAppEventType.START, new StartAppAttemptTransition()) @@ -116,7 +115,7 @@ RMAppEventType.APP_REJECTED, new AppRejectedTransition()) .addTransition(RMAppState.SUBMITTED, RMAppState.ACCEPTED, RMAppEventType.APP_ACCEPTED) .addTransition(RMAppState.SUBMITTED, RMAppState.KILLED, - RMAppEventType.KILL, new AppKilledTransition()) + RMAppEventType.KILL, new KillAppAndAttemptTransition()) // Transitions from ACCEPTED state .addTransition(RMAppState.ACCEPTED, RMAppState.RUNNING, @@ -126,7 +125,7 @@ RMAppEventType.KILL, new AppKilledTransition()) RMAppEventType.ATTEMPT_FAILED, new AttemptFailedTransition(RMAppState.SUBMITTED)) .addTransition(RMAppState.ACCEPTED, RMAppState.KILLED, - RMAppEventType.KILL, new AppKilledTransition()) + RMAppEventType.KILL, new KillAppAndAttemptTransition()) // Transitions from RUNNING state .addTransition(RMAppState.RUNNING, RMAppState.FINISHED, @@ -136,7 +135,7 @@ RMAppEventType.KILL, new AppKilledTransition()) RMAppEventType.ATTEMPT_FAILED, new AttemptFailedTransition(RMAppState.SUBMITTED)) .addTransition(RMAppState.RUNNING, RMAppState.KILLED, - RMAppEventType.KILL, new AppKilledTransition()) + RMAppEventType.KILL, new KillAppAndAttemptTransition()) // Transitions from FINISHED state .addTransition(RMAppState.FINISHED, RMAppState.FINISHED, @@ -168,6 +167,7 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext, this.name = name; this.rmContext = rmContext; this.dispatcher = rmContext.getDispatcher(); + this.handler = dispatcher.getEventHandler(); this.conf = config; this.user = user; this.queue = queue; @@ -403,7 +403,7 @@ private void createNewAttempt() { submissionContext); attempts.put(appAttemptId, attempt); currentAttempt = attempt; - dispatcher.getEventHandler().handle( + handler.handle( new RMAppAttemptEvent(appAttemptId, RMAppAttemptEventType.START)); } @@ -420,13 +420,23 @@ public void transition(RMAppImpl app, RMAppEvent event) { }; } - private static final class AppKilledTransition extends FinalTransition { + private static class AppKilledTransition extends FinalTransition { + @Override public void transition(RMAppImpl app, RMAppEvent event) { app.diagnostics.append("Application killed by user."); super.transition(app, event); }; } + private static class KillAppAndAttemptTransition extends AppKilledTransition { + @SuppressWarnings("unchecked") + @Override + public void transition(RMAppImpl app, RMAppEvent event) { + app.handler.handle(new RMAppAttemptEvent(app.currentAttempt.getAppAttemptId(), + RMAppAttemptEventType.KILL)); + super.transition(app, event); + } + } private static final class AppRejectedTransition extends FinalTransition{ public void transition(RMAppImpl app, RMAppEvent event) { @@ -450,11 +460,11 @@ private Set getNodesOnWhichAttemptRan(RMAppImpl app) { public void transition(RMAppImpl app, RMAppEvent event) { Set nodes = getNodesOnWhichAttemptRan(app); for (NodeId nodeId : nodes) { - app.dispatcher.getEventHandler().handle( + app.handler.handle( new RMNodeCleanAppEvent(nodeId, app.applicationId)); } app.finishTime = System.currentTimeMillis(); - app.dispatcher.getEventHandler().handle( + app.handler.handle( new RMAppManagerEvent(app.applicationId, RMAppManagerEventType.APP_COMPLETED)); }; diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 2123ee806c..fa2ca44d30 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -22,7 +22,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.ClientRMProtocol; -import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationIdRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationIdResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; @@ -109,9 +109,9 @@ public MockNM registerNode(String nodeIdStr, int memory) throws Exception { public void killApp(ApplicationId appId) throws Exception { ClientRMProtocol client = getClientRMService(); - FinishApplicationRequest req = Records.newRecord(FinishApplicationRequest.class); + KillApplicationRequest req = Records.newRecord(KillApplicationRequest.class); req.setApplicationId(appId); - client.finishApplication(req); + client.forceKillApplication(req); } //from AMLauncher diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/InlineDispatcher.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/InlineDispatcher.java index 32f6a429ca..d771a61d86 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/InlineDispatcher.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/InlineDispatcher.java @@ -1,52 +1,57 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ package org.apache.hadoop.yarn.server.resourcemanager.resourcetracker; -import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; -@Private public class InlineDispatcher extends AsyncDispatcher { - private class InlineEventHandler implements EventHandler { - private final InlineDispatcher dispatcher; - public InlineEventHandler(InlineDispatcher dispatcher) { - this.dispatcher = dispatcher; - } + private static final Log LOG = LogFactory.getLog(InlineDispatcher.class); + + private class TestEventHandler implements EventHandler { @Override public void handle(Event event) { - this.dispatcher.dispatch(event); + dispatch(event); } } - public void dispatch(Event event) { - super.dispatch(event); + @Override + protected void dispatch(Event event) { + LOG.info("Dispatching the event " + event.getClass().getName() + "." + + event.toString()); + + Class type = event.getType().getDeclaringClass(); + if (eventDispatchers.get(type) != null) { + eventDispatchers.get(type).handle(event); + } } @Override public EventHandler getEventHandler() { - return new InlineEventHandler(this); + return new TestEventHandler(); } - + static class EmptyEventHandler implements EventHandler { @Override public void handle(Event event) { - ; // ignore - } + //do nothing + } } } \ No newline at end of file diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java index 539207db3d..24408821e2 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java @@ -1,26 +1,27 @@ /** -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.hadoop.yarn.server.resourcemanager.rmapp; import static org.mockito.Mockito.mock; import java.io.IOException; +import java.util.List; import junit.framework.Assert; @@ -37,12 +38,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore; +import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.InlineDispatcher; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; @@ -52,27 +55,40 @@ public class TestRMAppTransitions { - private static final Log LOG = LogFactory.getLog(TestRMAppTransitions.class); - + static final Log LOG = LogFactory.getLog(TestRMAppTransitions.class); + private RMContext rmContext; private static int maxRetries = 4; private static int appId = 1; + private AsyncDispatcher rmDispatcher; // ignore all the RM application attempt events private static final class TestApplicationAttemptEventDispatcher implements - EventHandler { + EventHandler { - public TestApplicationAttemptEventDispatcher() { + private final RMContext rmContext; + public TestApplicationAttemptEventDispatcher(RMContext rmContext) { + this.rmContext = rmContext; } @Override public void handle(RMAppAttemptEvent event) { + ApplicationId appId = event.getApplicationAttemptId().getApplicationId(); + RMApp rmApp = this.rmContext.getRMApps().get(appId); + if (rmApp != null) { + try { + rmApp.getRMAppAttempt(event.getApplicationAttemptId()).handle(event); + } catch (Throwable t) { + LOG.error("Error in handling event type " + event.getType() + + " for application " + appId, t); + } + } } } // handle all the RM application events - same as in ResourceManager.java private static final class TestApplicationEventDispatcher implements - EventHandler { + EventHandler { private final RMContext rmContext; public TestApplicationEventDispatcher(RMContext rmContext) { @@ -97,18 +113,22 @@ public void handle(RMAppEvent event) { @Before public void setUp() throws Exception { AsyncDispatcher rmDispatcher = new AsyncDispatcher(); + Configuration conf = new Configuration(); + rmDispatcher = new InlineDispatcher(); ContainerAllocationExpirer containerAllocationExpirer = mock(ContainerAllocationExpirer.class); AMLivelinessMonitor amLivelinessMonitor = mock(AMLivelinessMonitor.class); this.rmContext = new RMContextImpl(new MemStore(), rmDispatcher, - containerAllocationExpirer, amLivelinessMonitor); + containerAllocationExpirer, amLivelinessMonitor); rmDispatcher.register(RMAppAttemptEventType.class, - new TestApplicationAttemptEventDispatcher()); + new TestApplicationAttemptEventDispatcher(this.rmContext)); rmDispatcher.register(RMAppEventType.class, new TestApplicationEventDispatcher(rmContext)); + rmDispatcher.init(conf); + rmDispatcher.start(); } protected RMApp createNewTestApp() { @@ -128,10 +148,10 @@ protected RMApp createNewTestApp() { new ApplicationTokenSecretManager(), scheduler); RMApp application = new RMAppImpl(applicationId, rmContext, - conf, name, user, - queue, submissionContext, clientTokenStr, - appStore, scheduler, - masterService); + conf, name, user, + queue, submissionContext, clientTokenStr, + appStore, scheduler, + masterService); testAppStartState(applicationId, user, name, queue, application); return application; @@ -193,6 +213,14 @@ private static void assertKilled(RMApp application) { "Application killed by user.", diag.toString()); } + private static void assertAppAndAttemptKilled(RMApp application) { + assertKilled(application); + /* also check if the attempt is killed */ + Assert.assertEquals( RMAppAttemptState.KILLED, + application.getCurrentAppAttempt().getAppAttemptState() + ); + } + private static void assertFailed(RMApp application, String regex) { assertTimesAtFinish(application); assertAppState(RMAppState.FAILED, application); @@ -298,10 +326,10 @@ public void testAppSubmittedKill() throws IOException { RMApp application = testCreateAppAccepted(); // SUBMITTED => KILLED event RMAppEventType.KILL - RMAppEvent event = - new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL); + RMAppEvent event = new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL); + this.rmContext.getRMApps().putIfAbsent(application.getApplicationId(), application); application.handle(event); - assertKilled(application); + assertAppAndAttemptKilled(application); } @Test