From 702236129b930a799a5a3295f0aa0dc7b619c354 Mon Sep 17 00:00:00 2001 From: Sangjin Lee Date: Wed, 25 May 2016 16:56:49 -0700 Subject: [PATCH] YARN-5095. flow activities and flow runs are populated with wrong timestamp when RM restarts w/ recovery enabled (Varun Saxena via sjlee) --- .../server/resourcemanager/RMAppManager.java | 12 ++-- .../resourcemanager/rmapp/RMAppImpl.java | 19 +++++- .../server/resourcemanager/TestRMRestart.java | 63 +++++++++++++++++++ 3 files changed, 87 insertions(+), 7 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index 836fe90eeb..49daedb692 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -288,8 +288,10 @@ protected void submitApplication( String user) throws YarnException, AccessControlException { ApplicationId applicationId = submissionContext.getApplicationId(); - RMAppImpl application = - createAndPopulateNewRMApp(submissionContext, submitTime, user, false); + // Passing start time as -1. It will be eventually set in RMAppImpl + // constructor. + RMAppImpl application = createAndPopulateNewRMApp( + submissionContext, submitTime, user, false, -1); Credentials credentials = null; try { credentials = parseCredentials(submissionContext); @@ -327,14 +329,14 @@ protected void recoverApplication(ApplicationStateData appState, // create and recover app. RMAppImpl application = createAndPopulateNewRMApp(appContext, appState.getSubmitTime(), - appState.getUser(), true); + appState.getUser(), true, appState.getStartTime()); application.handle(new RMAppRecoverEvent(appId, rmState)); } private RMAppImpl createAndPopulateNewRMApp( ApplicationSubmissionContext submissionContext, long submitTime, - String user, boolean isRecovery) + String user, boolean isRecovery, long startTime) throws YarnException, AccessControlException { // Do queue mapping if (!isRecovery) { @@ -391,7 +393,7 @@ private RMAppImpl createAndPopulateNewRMApp( submissionContext.getQueue(), submissionContext, this.scheduler, this.masterService, submitTime, submissionContext.getApplicationType(), - submissionContext.getApplicationTags(), amReq); + submissionContext.getApplicationTags(), amReq, startTime); // Concurrent app submissions with same applicationId will fail here // Concurrent app submissions with different applicationIds will not // influence each other diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 8463f3a42c..6e448f700b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -416,8 +416,19 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext, Configuration config, String name, String user, String queue, ApplicationSubmissionContext submissionContext, YarnScheduler scheduler, ApplicationMasterService masterService, long submitTime, - String applicationType, Set applicationTags, + String applicationType, Set applicationTags, ResourceRequest amReq) { + this(applicationId, rmContext, config, name, user, queue, submissionContext, + scheduler, masterService, submitTime, applicationType, applicationTags, + amReq, -1); + } + + public RMAppImpl(ApplicationId applicationId, RMContext rmContext, + Configuration config, String name, String user, String queue, + ApplicationSubmissionContext submissionContext, YarnScheduler scheduler, + ApplicationMasterService masterService, long submitTime, + String applicationType, Set applicationTags, + ResourceRequest amReq, long startTime) { this.systemClock = SystemClock.getInstance(); @@ -433,7 +444,11 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext, this.scheduler = scheduler; this.masterService = masterService; this.submitTime = submitTime; - this.startTime = this.systemClock.getTime(); + if (startTime <= 0) { + this.startTime = this.systemClock.getTime(); + } else { + this.startTime = startTime; + } this.applicationType = applicationType; this.applicationTags = applicationTags; this.amReq = amReq; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index 364f9d19f4..bbb1d15e1c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -109,6 +109,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; +import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.log4j.Level; @@ -1125,6 +1126,68 @@ public void testRMRestartOnMaxAppAttempts() throws Exception { Assert.assertNull(rmAppState.get(app2.getApplicationId()).getState()); } + @Test (timeout = 60000) + public void testRMRestartTimelineCollectorContext() throws Exception { + conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f); + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + RMState rmState = memStore.getState(); + Map rmAppState = + rmState.getApplicationState(); + MockRM rm1 = null; + MockRM rm2 = null; + try { + rm1 = createMockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); + nm1.registerNode(); + + // submit an app. + RMApp app = rm1.submitApp(200, "name", "user", + new HashMap(), false, "default", -1, + null); + // Check if app info has been saved. + ApplicationStateData appState = rmAppState.get(app.getApplicationId()); + Assert.assertNotNull(appState); + Assert.assertEquals(0, appState.getAttemptCount()); + Assert.assertEquals(appState.getApplicationSubmissionContext() + .getApplicationId(), app.getApplicationSubmissionContext() + .getApplicationId()); + + // Allocate the AM + nm1.nodeHeartbeat(true); + RMAppAttempt attempt = app.getCurrentAppAttempt(); + ApplicationAttemptId attemptId1 = attempt.getAppAttemptId(); + rm1.waitForState(attemptId1, RMAppAttemptState.ALLOCATED); + + ApplicationId appId = app.getApplicationId(); + TimelineCollectorContext contextBeforeRestart = + rm1.getRMContext().getRMTimelineCollectorManager().get(appId). + getTimelineEntityContext(); + + // Restart RM. + rm2 = createMockRM(conf, memStore); + rm2.start(); + Assert.assertEquals(1, rm2.getRMContext().getRMApps().size()); + rm2.waitForState(app.getApplicationId(), RMAppState.ACCEPTED); + TimelineCollectorContext contextAfterRestart = + rm2.getRMContext().getRMTimelineCollectorManager().get(appId). + getTimelineEntityContext(); + Assert.assertEquals("Collector contexts for an app should be same " + + "across restarts", contextBeforeRestart, contextAfterRestart); + } finally { + conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, false); + if (rm1 != null) { + rm1.close(); + } + if (rm2 != null) { + rm2.close(); + } + } + } + @Test (timeout = 60000) public void testDelegationTokenRestoredInDelegationTokenRenewer() throws Exception {