diff --git a/hadoop-mapreduce/CHANGES.txt b/hadoop-mapreduce/CHANGES.txt
index fbf710bfb0..04c6d825be 100644
--- a/hadoop-mapreduce/CHANGES.txt
+++ b/hadoop-mapreduce/CHANGES.txt
@@ -1127,6 +1127,9 @@ Trunk (unreleased changes)
MAPREDUCE-2868. ant build broken in hadoop-mapreduce dir (mahadev, giri and arun via mahadev)
+ MAPREDUCE-2649. Handling of finished applications in RM. (Thomas Graves
+ via acmurthy)
+
Release 0.22.0 - Unreleased
INCOMPATIBLE CHANGES
diff --git a/hadoop-mapreduce/hadoop-mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java b/hadoop-mapreduce/hadoop-mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
index d8a7e4d60c..6e4d1c5611 100644
--- a/hadoop-mapreduce/hadoop-mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
+++ b/hadoop-mapreduce/hadoop-mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
@@ -244,8 +244,8 @@ public class YARNRunner implements ClientProtocol {
ApplicationReport appMaster = resMgrDelegate
.getApplicationReport(applicationId);
- if (appMaster.getState() == ApplicationState.FAILED || appMaster.getState() ==
- ApplicationState.KILLED) {
+ if (appMaster == null || appMaster.getState() == ApplicationState.FAILED
+ || appMaster.getState() == ApplicationState.KILLED) {
throw RPCUtil.getRemoteException("failed to run job");
}
return clientServiceDelegate.getJobStatus(jobId);
diff --git a/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/resources/yarn-default.xml b/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/resources/yarn-default.xml
index fa9a74a0a4..eded765245 100644
--- a/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/resources/yarn-default.xml
+++ b/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/resources/yarn-default.xml
@@ -45,6 +45,14 @@
/etc/krb5.keytab
+
+ yarn.server.resourcemanager.expire.applications.completed.max
+ 10000
+ the maximum number of completed applications the RM
+ keeps in memory
+
+
+
diff --git a/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java b/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
index df7a9e6207..83878c0cd7 100644
--- a/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
+++ b/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
@@ -36,7 +36,6 @@ import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityInfo;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
-import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.ClientRMProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationResponse;
@@ -70,10 +69,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
import org.apache.hadoop.yarn.security.client.ClientRMSecurityInfo;
-import org.apache.hadoop.yarn.security.client.ClientToAMSecretManager;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
@@ -97,8 +93,6 @@ public class ClientRMService extends AbstractService implements
final private AtomicInteger applicationCounter = new AtomicInteger(0);
final private YarnScheduler scheduler;
final private RMContext rmContext;
- private final ApplicationMasterService masterService;
- private final ClientToAMSecretManager clientToAMSecretManager;
private final AMLivelinessMonitor amLivelinessMonitor;
private String clientServiceBindAddress;
@@ -109,15 +103,11 @@ public class ClientRMService extends AbstractService implements
private ApplicationACLsManager aclsManager;
private Map applicationACLs;
- public ClientRMService(RMContext rmContext,
- ClientToAMSecretManager clientToAMSecretManager,
- YarnScheduler scheduler, ApplicationMasterService masterService) {
+ public ClientRMService(RMContext rmContext, YarnScheduler scheduler) {
super(ClientRMService.class.getName());
this.scheduler = scheduler;
this.rmContext = rmContext;
- this.masterService = masterService;
this.amLivelinessMonitor = rmContext.getAMLivelinessMonitor();
- this.clientToAMSecretManager = clientToAMSecretManager;
}
@Override
@@ -206,42 +196,17 @@ public class ClientRMService extends AbstractService implements
ApplicationSubmissionContext submissionContext = request
.getApplicationSubmissionContext();
try {
-
- ApplicationId applicationId = submissionContext.getApplicationId();
- String clientTokenStr = null;
String user = UserGroupInformation.getCurrentUser().getShortUserName();
- if (UserGroupInformation.isSecurityEnabled()) {
- Token clientToken = new Token(
- new ApplicationTokenIdentifier(applicationId),
- this.clientToAMSecretManager);
- clientTokenStr = clientToken.encodeToUrlString();
- LOG.debug("Sending client token as " + clientTokenStr);
- }
-
- submissionContext.setQueue(submissionContext.getQueue() == null
- ? "default" : submissionContext.getQueue());
- submissionContext.setApplicationName(submissionContext
- .getApplicationName() == null ? "N/A" : submissionContext
- .getApplicationName());
-
- ApplicationStore appStore = rmContext.getApplicationsStore()
- .createApplicationStore(submissionContext.getApplicationId(),
- submissionContext);
- RMApp application = new RMAppImpl(applicationId, rmContext,
- getConfig(), submissionContext.getApplicationName(), user,
- submissionContext.getQueue(), submissionContext, clientTokenStr,
- appStore, this.amLivelinessMonitor, this.scheduler,
- this.masterService);
- if (rmContext.getRMApps().putIfAbsent(applicationId, application) != null) {
+ ApplicationId applicationId = submissionContext.getApplicationId();
+ if (rmContext.getRMApps().get(applicationId) != null) {
throw new IOException("Application with id " + applicationId
+ " is already present! Cannot add a duplicate!");
}
-
this.rmContext.getDispatcher().getEventHandler().handle(
- new RMAppEvent(applicationId, RMAppEventType.START));
+ new RMAppManagerSubmitEvent(submissionContext));
- LOG.info("Application with id " + applicationId.getId()
- + " submitted by user " + user + " with " + submissionContext);
+ LOG.info("Application with id " + applicationId.getId() +
+ " submitted by user " + user + " with " + submissionContext);
} catch (IOException ie) {
LOG.info("Exception in submitting application", ie);
throw RPCUtil.getRemoteException(ie);
diff --git a/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
new file mode 100644
index 0000000000..3219d8220d
--- /dev/null
+++ b/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.LinkedList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
+import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
+import org.apache.hadoop.yarn.security.client.ClientToAMSecretManager;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
+
+/**
+ * This class manages the list of applications for the resource manager.
+ */
+public class RMAppManager implements EventHandler {
+
+ private static final Log LOG = LogFactory.getLog(RMAppManager.class);
+
+ private int completedAppsMax = RMConfig.DEFAULT_EXPIRE_APPLICATIONS_COMPLETED_MAX;
+ private LinkedList completedApps = new LinkedList();
+
+ private final RMContext rmContext;
+ private final ClientToAMSecretManager clientToAMSecretManager;
+ private final ApplicationMasterService masterService;
+ private final YarnScheduler scheduler;
+ private Configuration conf;
+
+ public RMAppManager(RMContext context, ClientToAMSecretManager
+ clientToAMSecretManager, YarnScheduler scheduler,
+ ApplicationMasterService masterService, Configuration conf) {
+ this.rmContext = context;
+ this.scheduler = scheduler;
+ this.clientToAMSecretManager = clientToAMSecretManager;
+ this.masterService = masterService;
+ this.conf = conf;
+ setCompletedAppsMax(conf.getInt(
+ RMConfig.EXPIRE_APPLICATIONS_COMPLETED_MAX,
+ RMConfig.DEFAULT_EXPIRE_APPLICATIONS_COMPLETED_MAX));
+ }
+
+ protected void setCompletedAppsMax(int max) {
+ this.completedAppsMax = max;
+ }
+
+ protected synchronized int getCompletedAppsListSize() {
+ return this.completedApps.size();
+ }
+
+ protected synchronized void addCompletedApp(ApplicationId appId) {
+ if (appId == null) {
+ LOG.error("RMAppManager received completed appId of null, skipping");
+ } else {
+ completedApps.add(appId);
+ }
+ };
+
+ /*
+ * check to see if hit the limit for max # completed apps kept
+ */
+ protected synchronized void checkAppNumCompletedLimit() {
+ while (completedApps.size() > this.completedAppsMax) {
+ ApplicationId removeId = completedApps.remove();
+ LOG.info("Application should be expired, max # apps"
+ + " met. Removing app: " + removeId);
+ rmContext.getRMApps().remove(removeId);
+ }
+ }
+
+ protected void submitApplication(ApplicationSubmissionContext submissionContext) {
+ ApplicationId applicationId = submissionContext.getApplicationId();
+ RMApp application = null;
+ try {
+ String clientTokenStr = null;
+ String user = UserGroupInformation.getCurrentUser().getShortUserName();
+ if (UserGroupInformation.isSecurityEnabled()) {
+ Token clientToken = new
+ Token(
+ new ApplicationTokenIdentifier(applicationId),
+ this.clientToAMSecretManager);
+ clientTokenStr = clientToken.encodeToUrlString();
+ LOG.debug("Sending client token as " + clientTokenStr);
+ }
+ submissionContext.setQueue(submissionContext.getQueue() == null
+ ? "default" : submissionContext.getQueue());
+ submissionContext.setApplicationName(submissionContext
+ .getApplicationName() == null ? "N/A" : submissionContext
+ .getApplicationName());
+ ApplicationStore appStore = rmContext.getApplicationsStore()
+ .createApplicationStore(submissionContext.getApplicationId(),
+ submissionContext);
+ application = new RMAppImpl(applicationId, rmContext,
+ this.conf, submissionContext.getApplicationName(), user,
+ submissionContext.getQueue(), submissionContext, clientTokenStr,
+ appStore, rmContext.getAMLivelinessMonitor(), this.scheduler,
+ this.masterService);
+
+ if (rmContext.getRMApps().putIfAbsent(applicationId, application) != null) {
+ LOG.info("Application with id " + applicationId +
+ " is already present! Cannot add a duplicate!");
+ // don't send event through dispatcher as it will be handled by app already
+ // present with this id.
+ application.handle(new RMAppRejectedEvent(applicationId,
+ "Application with this id is already present! Cannot add a duplicate!"));
+ } else {
+ this.rmContext.getDispatcher().getEventHandler().handle(
+ new RMAppEvent(applicationId, RMAppEventType.START));
+ }
+ } catch (IOException ie) {
+ LOG.info("RMAppManager submit application exception", ie);
+ if (application != null) {
+ this.rmContext.getDispatcher().getEventHandler().handle(
+ new RMAppRejectedEvent(applicationId, ie.getMessage()));
+ }
+ }
+ }
+
+ @Override
+ public void handle(RMAppManagerEvent event) {
+ ApplicationId appID = event.getApplicationId();
+ LOG.debug("RMAppManager processing event for "
+ + appID + " of type " + event.getType());
+ switch(event.getType()) {
+ case APP_COMPLETED:
+ {
+ addCompletedApp(appID);
+ checkAppNumCompletedLimit();
+ }
+ break;
+ case APP_SUBMIT:
+ {
+ ApplicationSubmissionContext submissionContext =
+ ((RMAppManagerSubmitEvent)event).getSubmissionContext();
+ submitApplication(submissionContext);
+ }
+ break;
+ default:
+ LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!");
+ }
+ }
+}
diff --git a/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerEvent.java b/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerEvent.java
new file mode 100644
index 0000000000..7cddc174a3
--- /dev/null
+++ b/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerEvent.java
@@ -0,0 +1,18 @@
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+public class RMAppManagerEvent extends AbstractEvent {
+
+ private final ApplicationId appId;
+
+ public RMAppManagerEvent(ApplicationId appId, RMAppManagerEventType type) {
+ super(type);
+ this.appId = appId;
+ }
+
+ public ApplicationId getApplicationId() {
+ return this.appId;
+ }
+}
diff --git a/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerEventType.java b/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerEventType.java
new file mode 100644
index 0000000000..3b52ab14f3
--- /dev/null
+++ b/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerEventType.java
@@ -0,0 +1,6 @@
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+public enum RMAppManagerEventType {
+ APP_SUBMIT,
+ APP_COMPLETED
+}
diff --git a/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerSubmitEvent.java b/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerSubmitEvent.java
new file mode 100644
index 0000000000..bc206a2008
--- /dev/null
+++ b/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerSubmitEvent.java
@@ -0,0 +1,18 @@
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+
+public class RMAppManagerSubmitEvent extends RMAppManagerEvent {
+
+ private final ApplicationSubmissionContext submissionContext;
+
+ public RMAppManagerSubmitEvent(ApplicationSubmissionContext submissionContext) {
+ super(submissionContext.getApplicationId(), RMAppManagerEventType.APP_SUBMIT);
+ this.submissionContext = submissionContext;
+ }
+
+ public ApplicationSubmissionContext getSubmissionContext() {
+ return this.submissionContext;
+ }
+}
diff --git a/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMConfig.java b/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMConfig.java
index bf0fbcfc1f..c3fbf7610e 100644
--- a/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMConfig.java
+++ b/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMConfig.java
@@ -85,4 +85,9 @@ public class RMConfig {
public static final String RM_NODES_EXCLUDE_FILE =
YarnConfiguration.RM_PREFIX + "nodes.exclude";
public static final String DEFAULT_RM_NODES_EXCLUDE_FILE = "";
+
+ // the maximum number of completed applications RM keeps
+ public static String EXPIRE_APPLICATIONS_COMPLETED_MAX =
+ YarnConfiguration.RM_PREFIX + "expire.applications.completed.max";
+ public static final int DEFAULT_EXPIRE_APPLICATIONS_COMPLETED_MAX = 10000;
}
diff --git a/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index 029e810f61..553b98b52a 100644
--- a/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ b/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -98,6 +98,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
protected NMLivelinessMonitor nmLivelinessMonitor;
protected NodesListManager nodesListManager;
private SchedulerEventDispatcher schedulerDispatcher;
+ private RMAppManager rmAppManager;
private final AtomicBoolean shutdown = new AtomicBoolean(false);
private WebApp webApp;
@@ -176,6 +177,11 @@ public class ResourceManager extends CompositeService implements Recoverable {
clientRM = createClientRMService();
addService(clientRM);
+
+ this.rmAppManager = createRMAppManager();
+ // Register event handler for RMAppManagerEvents
+ this.rmDispatcher.register(RMAppManagerEventType.class,
+ this.rmAppManager);
adminService = createAdminService();
addService(adminService);
@@ -215,6 +221,11 @@ public class ResourceManager extends CompositeService implements Recoverable {
return new AMLivelinessMonitor(this.rmDispatcher);
}
+ protected RMAppManager createRMAppManager() {
+ return new RMAppManager(this.rmContext, this.clientToAMSecretManager,
+ this.scheduler, this.masterService, this.conf);
+ }
+
@Private
public static final class SchedulerEventDispatcher extends AbstractService
implements EventHandler {
@@ -429,8 +440,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
}
protected ClientRMService createClientRMService() {
- return new ClientRMService(this.rmContext, this.clientToAMSecretManager,
- scheduler, masterService);
+ return new ClientRMService(this.rmContext, scheduler);
}
protected ApplicationMasterService createApplicationMasterService() {
diff --git a/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index cdab96ba33..97f6e7f515 100644
--- a/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ b/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -24,6 +24,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
import org.apache.hadoop.yarn.server.resourcemanager.RMConfig;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
+import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
@@ -89,6 +91,8 @@ public class RMAppImpl implements RMApp {
RMAppEventType.START, new StartAppAttemptTransition())
.addTransition(RMAppState.NEW, RMAppState.KILLED, RMAppEventType.KILL,
new AppKilledTransition())
+ .addTransition(RMAppState.NEW, RMAppState.FAILED,
+ RMAppEventType.APP_REJECTED, new AppRejectedTransition())
// Transitions from SUBMITTED state
.addTransition(RMAppState.SUBMITTED, RMAppState.FAILED,
@@ -429,6 +433,9 @@ public class RMAppImpl implements RMApp {
new RMNodeCleanAppEvent(nodeId, app.applicationId));
}
app.finishTime = System.currentTimeMillis();
+ app.dispatcher.getEventHandler().handle(
+ new RMAppManagerEvent(app.applicationId,
+ RMAppManagerEventType.APP_COMPLETED));
};
}
diff --git a/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
index 03d88fd851..aeb1f81b91 100644
--- a/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
+++ b/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
@@ -42,8 +42,13 @@ public class MockRM extends ResourceManager {
public void waitForState(ApplicationId appId, RMAppState finalState)
throws Exception {
- RMApp app = getRMContext().getRMApps().get(appId);
int timeoutSecs = 0;
+ RMApp app = null;
+ while ((app == null) && timeoutSecs++ < 20) {
+ app = getRMContext().getRMApps().get(appId);
+ Thread.sleep(500);
+ }
+ timeoutSecs = 0;
while (!finalState.equals(app.getState()) &&
timeoutSecs++ < 20) {
System.out.println("App State is : " + app.getState() +
@@ -108,8 +113,7 @@ public class MockRM extends ResourceManager {
@Override
protected ClientRMService createClientRMService() {
- return new ClientRMService(getRMContext(),
- clientToAMSecretManager, getResourceScheduler(), masterService) {
+ return new ClientRMService(getRMContext(), getResourceScheduler()) {
@Override
public void start() {
//override to not start rpc handler
diff --git a/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java b/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
new file mode 100644
index 0000000000..3109198d97
--- /dev/null
+++ b/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
@@ -0,0 +1,463 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import static org.mockito.Mockito.*;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.MockApps;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
+import org.apache.hadoop.yarn.security.client.ClientToAMSecretManager;
+import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.RMAppManager;
+import org.apache.hadoop.yarn.server.resourcemanager.RMConfig;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.service.Service;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Lists;
+
+/**
+ * Testing applications being retired from RM.
+ *
+ */
+
+public class TestAppManager{
+ private static final Log LOG = LogFactory.getLog(TestAppManager.class);
+ private static RMAppEventType appEventType = RMAppEventType.KILL;
+
+ public synchronized RMAppEventType getAppEventType() {
+ return appEventType;
+ }
+ public synchronized void setAppEventType(RMAppEventType newType) {
+ appEventType = newType;
+ }
+
+
+ public static List newRMApps(int n, long time, RMAppState state) {
+ List list = Lists.newArrayList();
+ for (int i = 0; i < n; ++i) {
+ list.add(new MockRMApp(i, time, state));
+ }
+ return list;
+ }
+
+ public static RMContext mockRMContext(int n, long time) {
+ final List apps = newRMApps(n, time, RMAppState.FINISHED);
+ final ConcurrentMap map = Maps.newConcurrentMap();
+ for (RMApp app : apps) {
+ map.put(app.getApplicationId(), app);
+ }
+ Dispatcher rmDispatcher = new AsyncDispatcher();
+ ContainerAllocationExpirer containerAllocationExpirer = new ContainerAllocationExpirer(
+ rmDispatcher);
+ AMLivelinessMonitor amLivelinessMonitor = new AMLivelinessMonitor(
+ rmDispatcher);
+ return new RMContextImpl(new MemStore(), rmDispatcher,
+ containerAllocationExpirer, amLivelinessMonitor) {
+ @Override
+ public ConcurrentMap getRMApps() {
+ return map;
+ }
+ };
+ }
+
+ public class TestAppManagerDispatcher implements
+ EventHandler {
+
+ private final RMContext rmContext;
+
+ public TestAppManagerDispatcher(RMContext rmContext) {
+ this.rmContext = rmContext;
+ }
+
+ @Override
+ public void handle(RMAppManagerEvent event) {
+ // do nothing
+ }
+ }
+
+ public class TestDispatcher implements
+ EventHandler {
+
+ private final RMContext rmContext;
+
+ public TestDispatcher(RMContext rmContext) {
+ this.rmContext = rmContext;
+ }
+
+ @Override
+ public void handle(RMAppEvent event) {
+ ApplicationId appID = event.getApplicationId();
+ //RMApp rmApp = this.rmContext.getRMApps().get(appID);
+ setAppEventType(event.getType());
+ System.out.println("in handle routine " + getAppEventType().toString());
+ }
+ }
+
+
+ // Extend and make the functions we want to test public
+ public class TestRMAppManager extends RMAppManager {
+
+ public TestRMAppManager(RMContext context, Configuration conf) {
+ super(context, null, null, null, conf);
+ setCompletedAppsMax(RMConfig.DEFAULT_EXPIRE_APPLICATIONS_COMPLETED_MAX);
+ }
+
+ public TestRMAppManager(RMContext context, ClientToAMSecretManager
+ clientToAMSecretManager, YarnScheduler scheduler,
+ ApplicationMasterService masterService, Configuration conf) {
+ super(context, clientToAMSecretManager, scheduler, masterService, conf);
+ setCompletedAppsMax(RMConfig.DEFAULT_EXPIRE_APPLICATIONS_COMPLETED_MAX);
+ }
+
+ public void checkAppNumCompletedLimit() {
+ super.checkAppNumCompletedLimit();
+ }
+
+ public void addCompletedApp(ApplicationId appId) {
+ super.addCompletedApp(appId);
+ }
+
+ public int getCompletedAppsListSize() {
+ return super.getCompletedAppsListSize();
+ }
+
+ public void setCompletedAppsMax(int max) {
+ super.setCompletedAppsMax(max);
+ }
+ public void submitApplication(ApplicationSubmissionContext submissionContext) {
+ super.submitApplication(submissionContext);
+ }
+ }
+
+ protected void addToCompletedApps(TestRMAppManager appMonitor, RMContext rmContext) {
+ for (RMApp app : rmContext.getRMApps().values()) {
+ if (app.getState() == RMAppState.FINISHED
+ || app.getState() == RMAppState.KILLED
+ || app.getState() == RMAppState.FAILED) {
+ appMonitor.addCompletedApp(app.getApplicationId());
+ }
+ }
+ }
+
+ @Test
+ public void testRMAppRetireNone() throws Exception {
+ long now = System.currentTimeMillis();
+
+ // Create such that none of the applications will retire since
+ // haven't hit max #
+ RMContext rmContext = mockRMContext(10, now - 10);
+ TestRMAppManager appMonitor = new TestRMAppManager(rmContext, new Configuration());
+
+ appMonitor.setCompletedAppsMax(10);
+
+ Assert.assertEquals("Number of apps incorrect before checkAppTimeLimit",
+ 10, rmContext.getRMApps().size());
+
+ // add them to completed apps list
+ addToCompletedApps(appMonitor, rmContext);
+
+ // shouldn't have to many apps
+ appMonitor.checkAppNumCompletedLimit();
+ Assert.assertEquals("Number of apps incorrect after # completed check", 10,
+ rmContext.getRMApps().size());
+ Assert.assertEquals("Number of completed apps incorrect after check", 10,
+ appMonitor.getCompletedAppsListSize());
+ }
+
+ @Test
+ public void testRMAppRetireSome() throws Exception {
+ long now = System.currentTimeMillis();
+
+ RMContext rmContext = mockRMContext(10, now - 20000);
+ TestRMAppManager appMonitor = new TestRMAppManager(rmContext, new Configuration());
+
+ appMonitor.setCompletedAppsMax(3);
+
+ Assert.assertEquals("Number of apps incorrect before", 10, rmContext
+ .getRMApps().size());
+
+ // add them to completed apps list
+ addToCompletedApps(appMonitor, rmContext);
+
+ // shouldn't have to many apps
+ appMonitor.checkAppNumCompletedLimit();
+ Assert.assertEquals("Number of apps incorrect after # completed check", 3,
+ rmContext.getRMApps().size());
+ Assert.assertEquals("Number of completed apps incorrect after check", 3,
+ appMonitor.getCompletedAppsListSize());
+ }
+
+ @Test
+ public void testRMAppRetireSomeDifferentStates() throws Exception {
+ long now = System.currentTimeMillis();
+
+ // these parameters don't matter, override applications below
+ RMContext rmContext = mockRMContext(10, now - 20000);
+ TestRMAppManager appMonitor = new TestRMAppManager(rmContext, new Configuration());
+
+ appMonitor.setCompletedAppsMax(2);
+
+ // clear out applications map
+ rmContext.getRMApps().clear();
+ Assert.assertEquals("map isn't empty", 0, rmContext.getRMApps().size());
+
+ // / set with various finished states
+ RMApp app = new MockRMApp(0, now - 20000, RMAppState.KILLED);
+ rmContext.getRMApps().put(app.getApplicationId(), app);
+ app = new MockRMApp(1, now - 200000, RMAppState.FAILED);
+ rmContext.getRMApps().put(app.getApplicationId(), app);
+ app = new MockRMApp(2, now - 30000, RMAppState.FINISHED);
+ rmContext.getRMApps().put(app.getApplicationId(), app);
+ app = new MockRMApp(3, now - 20000, RMAppState.RUNNING);
+ rmContext.getRMApps().put(app.getApplicationId(), app);
+ app = new MockRMApp(4, now - 20000, RMAppState.NEW);
+ rmContext.getRMApps().put(app.getApplicationId(), app);
+
+ // make sure it doesn't expire these since still running
+ app = new MockRMApp(5, now - 10001, RMAppState.KILLED);
+ rmContext.getRMApps().put(app.getApplicationId(), app);
+ app = new MockRMApp(6, now - 30000, RMAppState.ACCEPTED);
+ rmContext.getRMApps().put(app.getApplicationId(), app);
+ app = new MockRMApp(7, now - 20000, RMAppState.SUBMITTED);
+ rmContext.getRMApps().put(app.getApplicationId(), app);
+ app = new MockRMApp(8, now - 10001, RMAppState.FAILED);
+ rmContext.getRMApps().put(app.getApplicationId(), app);
+ app = new MockRMApp(9, now - 20000, RMAppState.FAILED);
+ rmContext.getRMApps().put(app.getApplicationId(), app);
+
+ Assert.assertEquals("Number of apps incorrect before", 10, rmContext
+ .getRMApps().size());
+
+ // add them to completed apps list
+ addToCompletedApps(appMonitor, rmContext);
+
+ // shouldn't have to many apps
+ appMonitor.checkAppNumCompletedLimit();
+ Assert.assertEquals("Number of apps incorrect after # completed check", 6,
+ rmContext.getRMApps().size());
+ Assert.assertEquals("Number of completed apps incorrect after check", 2,
+ appMonitor.getCompletedAppsListSize());
+
+ }
+
+ @Test
+ public void testRMAppRetireNullApp() throws Exception {
+ long now = System.currentTimeMillis();
+
+ RMContext rmContext = mockRMContext(10, now - 20000);
+ TestRMAppManager appMonitor = new TestRMAppManager(rmContext, new Configuration());
+
+ Assert.assertEquals("Number of apps incorrect before", 10, rmContext
+ .getRMApps().size());
+
+ appMonitor.addCompletedApp(null);
+
+ Assert.assertEquals("Number of completed apps incorrect after check", 0,
+ appMonitor.getCompletedAppsListSize());
+ }
+
+ @Test
+ public void testRMAppRetireZeroSetting() throws Exception {
+ long now = System.currentTimeMillis();
+
+ RMContext rmContext = mockRMContext(10, now - 20000);
+ TestRMAppManager appMonitor = new TestRMAppManager(rmContext, new Configuration());
+
+ Assert.assertEquals("Number of apps incorrect before", 10, rmContext
+ .getRMApps().size());
+
+ // test with 0
+ appMonitor.setCompletedAppsMax(0);
+
+ addToCompletedApps(appMonitor, rmContext);
+ Assert.assertEquals("Number of completed apps incorrect", 10,
+ appMonitor.getCompletedAppsListSize());
+
+ appMonitor.checkAppNumCompletedLimit();
+
+ Assert.assertEquals("Number of apps incorrect after # completed check", 0,
+ rmContext.getRMApps().size());
+ Assert.assertEquals("Number of completed apps incorrect after check", 0,
+ appMonitor.getCompletedAppsListSize());
+ }
+
+ protected void setupDispatcher(RMContext rmContext, Configuration conf) {
+ TestDispatcher testDispatcher = new TestDispatcher(rmContext);
+ TestAppManagerDispatcher testAppManagerDispatcher = new TestAppManagerDispatcher(rmContext);
+ rmContext.getDispatcher().register(RMAppEventType.class, testDispatcher);
+ rmContext.getDispatcher().register(RMAppManagerEventType.class, testAppManagerDispatcher);
+ ((Service)rmContext.getDispatcher()).init(conf);
+ ((Service)rmContext.getDispatcher()).start();
+ Assert.assertEquals("app event type is wrong before", RMAppEventType.KILL, appEventType);
+ }
+
+ @Test
+ public void testRMAppSubmit() throws Exception {
+ long now = System.currentTimeMillis();
+
+ RMContext rmContext = mockRMContext(0, now - 10);
+ ResourceScheduler scheduler = new CapacityScheduler();
+ ApplicationMasterService masterService = new ApplicationMasterService(rmContext,
+ new ApplicationTokenSecretManager(), scheduler);
+ Configuration conf = new Configuration();
+ TestRMAppManager appMonitor = new TestRMAppManager(rmContext,
+ new ClientToAMSecretManager(), scheduler, masterService, conf);
+
+ ApplicationId appID = MockApps.newAppID(1);
+ RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+ ApplicationSubmissionContext context = recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
+ context.setApplicationId(appID);
+ setupDispatcher(rmContext, conf);
+
+ appMonitor.submitApplication(context);
+ RMApp app = rmContext.getRMApps().get(appID);
+ Assert.assertNotNull("app is null", app);
+ Assert.assertEquals("app id doesn't match", appID, app.getApplicationId());
+ Assert.assertEquals("app name doesn't match", "N/A", app.getName());
+ Assert.assertEquals("app queue doesn't match", "default", app.getQueue());
+ Assert.assertEquals("app state doesn't match", RMAppState.NEW, app.getState());
+ Assert.assertNotNull("app store is null", app.getApplicationStore());
+
+ // wait for event to be processed
+ int timeoutSecs = 0;
+ while ((getAppEventType() == RMAppEventType.KILL) &&
+ timeoutSecs++ < 20) {
+ Thread.sleep(1000);
+ }
+ Assert.assertEquals("app event type sent is wrong", RMAppEventType.START, getAppEventType());
+ setAppEventType(RMAppEventType.KILL);
+ ((Service)rmContext.getDispatcher()).stop();
+ }
+
+ @Test
+ public void testRMAppSubmitWithQueueAndName() throws Exception {
+ long now = System.currentTimeMillis();
+
+ RMContext rmContext = mockRMContext(1, now - 10);
+ ResourceScheduler scheduler = new CapacityScheduler();
+ ApplicationMasterService masterService = new ApplicationMasterService(rmContext,
+ new ApplicationTokenSecretManager(), scheduler);
+ Configuration conf = new Configuration();
+ TestRMAppManager appMonitor = new TestRMAppManager(rmContext,
+ new ClientToAMSecretManager(), scheduler, masterService, conf);
+
+ ApplicationId appID = MockApps.newAppID(10);
+ RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+ ApplicationSubmissionContext context = recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
+ context.setApplicationId(appID);
+ context.setApplicationName("testApp1");
+ context.setQueue("testQueue");
+
+ setupDispatcher(rmContext, conf);
+
+ appMonitor.submitApplication(context);
+ RMApp app = rmContext.getRMApps().get(appID);
+ Assert.assertNotNull("app is null", app);
+ Assert.assertEquals("app id doesn't match", appID, app.getApplicationId());
+ Assert.assertEquals("app name doesn't match", "testApp1", app.getName());
+ Assert.assertEquals("app queue doesn't match", "testQueue", app.getQueue());
+ Assert.assertEquals("app state doesn't match", RMAppState.NEW, app.getState());
+ Assert.assertNotNull("app store is null", app.getApplicationStore());
+
+ // wait for event to be processed
+ int timeoutSecs = 0;
+ while ((getAppEventType() == RMAppEventType.KILL) &&
+ timeoutSecs++ < 20) {
+ Thread.sleep(1000);
+ }
+ Assert.assertEquals("app event type sent is wrong", RMAppEventType.START, getAppEventType());
+ setAppEventType(RMAppEventType.KILL);
+ ((Service)rmContext.getDispatcher()).stop();
+ }
+
+ @Test
+ public void testRMAppSubmitError() throws Exception {
+ long now = System.currentTimeMillis();
+
+ // specify 1 here and use same appId below so it gets duplicate entry
+ RMContext rmContext = mockRMContext(1, now - 10);
+ ResourceScheduler scheduler = new CapacityScheduler();
+ ApplicationMasterService masterService = new ApplicationMasterService(rmContext,
+ new ApplicationTokenSecretManager(), scheduler);
+ Configuration conf = new Configuration();
+ TestRMAppManager appMonitor = new TestRMAppManager(rmContext,
+ new ClientToAMSecretManager(), scheduler, masterService, conf);
+
+ ApplicationId appID = MockApps.newAppID(0);
+ RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+ ApplicationSubmissionContext context = recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
+ context.setApplicationId(appID);
+ context.setApplicationName("testApp1");
+ context.setQueue("testQueue");
+
+ setupDispatcher(rmContext, conf);
+
+ RMApp appOrig = rmContext.getRMApps().get(appID);
+ Assert.assertTrue("app name matches but shouldn't", "testApp1" != appOrig.getName());
+
+ // our testApp1 should be rejected and original app with same id should be left in place
+ appMonitor.submitApplication(context);
+
+ // make sure original app didn't get removed
+ RMApp app = rmContext.getRMApps().get(appID);
+ Assert.assertNotNull("app is null", app);
+ Assert.assertEquals("app id doesn't match", appID, app.getApplicationId());
+ Assert.assertEquals("app name doesn't matches", appOrig.getName(), app.getName());
+ ((Service)rmContext.getDispatcher()).stop();
+ }
+
+}
diff --git a/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java b/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
new file mode 100644
index 0000000000..0ef02381af
--- /dev/null
+++ b/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
@@ -0,0 +1,106 @@
+package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.MockApps;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+
+public class MockRMApp implements RMApp {
+ static final int DT = 1000000; // ms
+
+ String user = MockApps.newUserName();
+ String name = MockApps.newAppName();
+ String queue = MockApps.newQueue();
+ long start = System.currentTimeMillis() - (int) (Math.random() * DT);
+ long finish = 0;
+ RMAppState state = RMAppState.NEW;
+ int failCount = 0;
+ ApplicationId id;
+
+ public MockRMApp(int newid, long time, RMAppState newState) {
+ finish = time;
+ id = MockApps.newAppID(newid);
+ state = newState;
+ }
+
+ public MockRMApp(int newid, long time, RMAppState newState, String userName) {
+ this(newid, time, newState);
+ user = userName;
+ }
+
+ @Override
+ public ApplicationId getApplicationId() {
+ return id;
+ }
+
+ @Override
+ public RMAppState getState() {
+ return state;
+ }
+
+ @Override
+ public String getUser() {
+ return user;
+ }
+
+ @Override
+ public float getProgress() {
+ return (float) 0.0;
+ }
+
+ @Override
+ public RMAppAttempt getRMAppAttempt(ApplicationAttemptId appAttemptId) {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ @Override
+ public String getQueue() {
+ return queue;
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public RMAppAttempt getCurrentAppAttempt() {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ @Override
+ public ApplicationReport createAndGetApplicationReport() {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ @Override
+ public ApplicationStore getApplicationStore() {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ @Override
+ public long getFinishTime() {
+ return finish;
+ }
+
+ @Override
+ public long getStartTime() {
+ return start;
+ }
+
+ @Override
+ public String getTrackingUrl() {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ @Override
+ public StringBuilder getDiagnostics() {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ public void handle(RMAppEvent event) {
+ };
+
+}
diff --git a/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java b/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
index a489ced3d4..0614f8107b 100644
--- a/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
+++ b/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
@@ -262,6 +262,18 @@ public class TestRMAppTransitions {
assertKilled(application);
}
+ @Test
+ public void testAppNewReject() throws IOException {
+ LOG.info("--- START: testAppNewReject ---");
+
+ RMApp application = createNewTestApp();
+ // NEW => FAILED event RMAppEventType.APP_REJECTED
+ String rejectedText = "Test Application Rejected";
+ RMAppEvent event = new RMAppRejectedEvent(application.getApplicationId(), rejectedText);
+ application.handle(event);
+ assertFailed(application, rejectedText);
+ }
+
@Test
public void testAppSubmittedRejected() throws IOException {
LOG.info("--- START: testAppSubmittedRejected ---");