From 3e06a5dcea8224ba71aec284df23b47d536bb06d Mon Sep 17 00:00:00 2001 From: Jonathan Hung Date: Mon, 30 Jul 2018 17:41:01 -0700 Subject: [PATCH] YARN-7974. Allow updating application tracking url after registration. Contributed by Jonathan Hung --- .../api/protocolrecords/AllocateRequest.java | 47 ++++++++++- .../src/main/proto/yarn_service_protos.proto | 1 + .../hadoop/yarn/client/api/AMRMClient.java | 11 +++ .../client/api/async/AMRMClientAsync.java | 11 +++ .../api/async/impl/AMRMClientAsyncImpl.java | 5 ++ .../yarn/client/api/impl/AMRMClientImpl.java | 11 +++ .../yarn/client/api/impl/TestAMRMClient.java | 77 +++++++++++++++++++ .../impl/pb/AllocateRequestPBImpl.java | 27 ++++++- .../resourcemanager/DefaultAMSProcessor.java | 2 +- .../rmapp/attempt/RMAppAttemptImpl.java | 20 +++++ .../event/RMAppAttemptStatusupdateEvent.java | 11 +++ .../TestApplicationMasterService.java | 34 ++++++++ .../server/resourcemanager/TestRMRestart.java | 45 +++++++++++ 13 files changed, 298 insertions(+), 4 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java index eee50e3c1e..799088b543 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java @@ -73,7 +73,21 @@ public static AllocateRequest newInstance(int responseID, float appProgress, .releaseList(containersToBeReleased) .resourceBlacklistRequest(resourceBlacklistRequest).build(); } - + + @Public + @Unstable + public static AllocateRequest newInstance(int responseID, float appProgress, + List resourceAsk, + List containersToBeReleased, + ResourceBlacklistRequest resourceBlacklistRequest, + String trackingUrl) { + return AllocateRequest.newBuilder().responseId(responseID) + .progress(appProgress).askList(resourceAsk) + .releaseList(containersToBeReleased) + .resourceBlacklistRequest(resourceBlacklistRequest) + .trackingUrl(trackingUrl).build(); + } + @Public @Unstable public static AllocateRequest newInstance(int responseID, float appProgress, @@ -240,6 +254,22 @@ public void setSchedulingRequests( List schedulingRequests) { } + /** + * Get the tracking url update for this heartbeat. + * @return tracking url to update this application with + */ + @Public + @Unstable + public abstract String getTrackingUrl(); + + /** + * Set the new tracking url for this application. + * @param trackingUrl the new tracking url + */ + @Public + @Unstable + public abstract void setTrackingUrl(String trackingUrl); + @Public @Unstable public static AllocateRequestBuilder newBuilder() { @@ -355,6 +385,19 @@ public AllocateRequestBuilder schedulingRequests( return this; } + /** + * Set the trackingUrl of the request. + * @see AllocateRequest#setTrackingUrl(String) + * @param trackingUrl new tracking url + * @return {@link AllocateRequestBuilder} + */ + @Public + @Unstable + public AllocateRequestBuilder trackingUrl(String trackingUrl) { + allocateRequest.setTrackingUrl(trackingUrl); + return this; + } + /** * Return generated {@link AllocateRequest} object. * @return {@link AllocateRequest} @@ -365,4 +408,4 @@ public AllocateRequest build() { return allocateRequest; } } -} \ No newline at end of file +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index 92a65adbed..acd452dc79 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -92,6 +92,7 @@ message AllocateRequestProto { optional float progress = 5; repeated UpdateContainerRequestProto update_requests = 7; repeated SchedulingRequestProto scheduling_requests = 10; + optional string tracking_url = 11; } message NMTokenProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java index 32aa21d52b..59b33530b0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java @@ -804,6 +804,17 @@ public TimelineV2Client getRegisteredTimelineV2Client() { return this.timelineV2Client; } + /** + * Update application's tracking url on next heartbeat. + * + * @param trackingUrl new tracking url for this application + */ + @Public + @InterfaceStability.Unstable + public void updateTrackingUrl(String trackingUrl) { + // Unimplemented. + } + /** * Wait for check to return true for each 1000 ms. * See also {@link #waitFor(java.util.function.Supplier, int)} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java index 0af687bd58..3dd2f718ba 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java @@ -412,6 +412,17 @@ public TimelineV2Client getRegisteredTimelineV2Client() { public abstract void updateBlacklist(List blacklistAdditions, List blacklistRemovals); + /** + * Update application's tracking url on next heartbeat. + * + * @param trackingUrl new tracking url for this application + */ + @Public + @Unstable + public void updateTrackingUrl(String trackingUrl) { + // Unimplemented. + } + /** * Wait for check to return true for each 1000 ms. * See also {@link #waitFor(java.util.function.Supplier, int)} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java index 4f04b66e10..3cf2c3496e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java @@ -286,6 +286,11 @@ public void updateBlacklist(List blacklistAdditions, List blacklistRemovals) { client.updateBlacklist(blacklistAdditions, blacklistRemovals); } + + @Override + public void updateTrackingUrl(String trackingUrl) { + client.updateTrackingUrl(trackingUrl); + } private class HeartbeatThread extends Thread { public HeartbeatThread() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java index 7265d24ac0..6dcecde09a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java @@ -99,6 +99,7 @@ public class AMRMClientImpl extends AMRMClient { protected String appHostName; protected int appHostPort; protected String appTrackingUrl; + protected String newTrackingUrl; protected ApplicationMasterProtocol rmClient; protected Resource clusterAvailableResources; @@ -308,6 +309,11 @@ public AllocateResponse allocate(float progressIndicator) .releaseList(releaseList).updateRequests(updateList) .schedulingRequests(schedulingRequestList).build(); + if (this.newTrackingUrl != null) { + allocateRequest.setTrackingUrl(this.newTrackingUrl); + this.appTrackingUrl = this.newTrackingUrl; + this.newTrackingUrl = null; + } // clear blacklistAdditions and blacklistRemovals before // unsynchronized part blacklistAdditions.clear(); @@ -1008,6 +1014,11 @@ public synchronized void updateBlacklist(List blacklistAdditions, } } + @Override + public synchronized void updateTrackingUrl(String trackingUrl) { + this.newTrackingUrl = trackingUrl; + } + private void updateAMRMToken(Token token) throws IOException { org.apache.hadoop.security.token.Token amrmToken = new org.apache.hadoop.security.token.Token(token diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java index 8dda8b4d6b..cf837794f0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java @@ -20,10 +20,12 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.io.IOException; @@ -79,6 +81,7 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import org.mockito.ArgumentCaptor; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import org.eclipse.jetty.util.log.Log; @@ -1994,4 +1997,78 @@ public void testGetMatchingFitWithProfiles() throws Exception { } } } + + @Test(timeout = 60000) + public void testNoUpdateTrackingUrl() { + try { + AMRMClientImpl amClient = null; + amClient = new AMRMClientImpl<>(); + amClient.init(conf); + amClient.start(); + amClient.registerApplicationMaster("Host", 10000, ""); + + assertEquals("", amClient.appTrackingUrl); + + ApplicationMasterProtocol mockRM = mock(ApplicationMasterProtocol.class); + AllocateResponse mockResponse = mock(AllocateResponse.class); + when(mockRM.allocate(any(AllocateRequest.class))) + .thenReturn(mockResponse); + ApplicationMasterProtocol realRM = amClient.rmClient; + amClient.rmClient = mockRM; + // Do allocate without updated tracking url + amClient.allocate(0.1f); + ArgumentCaptor argument = + ArgumentCaptor.forClass(AllocateRequest.class); + verify(mockRM).allocate(argument.capture()); + assertNull(argument.getValue().getTrackingUrl()); + + amClient.rmClient = realRM; + amClient + .unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, null, + null); + } catch (IOException | YarnException e) { + throw new AssertionError( + "testNoUpdateTrackingUrl unexpectedly threw exception: " + e); + } + } + + @Test(timeout = 60000) + public void testUpdateTrackingUrl() { + try { + AMRMClientImpl amClient = null; + amClient = new AMRMClientImpl<>(); + amClient.init(conf); + amClient.start(); + amClient.registerApplicationMaster("Host", 10000, ""); + + String trackingUrl = "hadoop.apache.org"; + assertEquals("", amClient.appTrackingUrl); + + ApplicationMasterProtocol mockRM = mock(ApplicationMasterProtocol.class); + AllocateResponse mockResponse = mock(AllocateResponse.class); + when(mockRM.allocate(any(AllocateRequest.class))) + .thenReturn(mockResponse); + ApplicationMasterProtocol realRM = amClient.rmClient; + amClient.rmClient = mockRM; + // Do allocate with updated tracking url + amClient.updateTrackingUrl(trackingUrl); + assertEquals(trackingUrl, amClient.newTrackingUrl); + assertEquals("", amClient.appTrackingUrl); + amClient.allocate(0.1f); + assertNull(amClient.newTrackingUrl); + assertEquals(trackingUrl, amClient.appTrackingUrl); + ArgumentCaptor argument + = ArgumentCaptor.forClass(AllocateRequest.class); + verify(mockRM).allocate(argument.capture()); + assertEquals(trackingUrl, argument.getValue().getTrackingUrl()); + + amClient.rmClient = realRM; + amClient + .unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, null, + null); + } catch (IOException | YarnException e) { + throw new AssertionError( + "testUpdateTrackingUrl unexpectedly threw exception: " + e); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateRequestPBImpl.java index 50672a3005..b5360a5dfb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateRequestPBImpl.java @@ -58,6 +58,7 @@ public class AllocateRequestPBImpl extends AllocateRequest { private List updateRequests = null; private List schedulingRequests = null; private ResourceBlacklistRequest blacklistRequest = null; + private String trackingUrl = null; public AllocateRequestPBImpl() { builder = AllocateRequestProto.newBuilder(); @@ -111,6 +112,9 @@ private void mergeLocalToBuilder() { if (this.blacklistRequest != null) { builder.setBlacklistRequest(convertToProtoFormat(this.blacklistRequest)); } + if (this.trackingUrl != null) { + builder.setTrackingUrl(this.trackingUrl); + } } private void mergeLocalToProto() { @@ -398,7 +402,28 @@ private void initReleases() { this.release.add(convertFromProtoFormat(c)); } } - + + @Override + public String getTrackingUrl() { + AllocateRequestProtoOrBuilder p = viaProto ? proto : builder; + if (this.trackingUrl != null) { + return this.trackingUrl; + } + if (p.hasTrackingUrl()) { + this.trackingUrl = p.getTrackingUrl(); + } + return this.trackingUrl; + } + + @Override + public void setTrackingUrl(String trackingUrl) { + maybeInitBuilder(); + if (trackingUrl == null) { + builder.clearTrackingUrl(); + } + this.trackingUrl = trackingUrl; + } + private void addReleasesToProto() { maybeInitBuilder(); builder.clearRelease(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java index 43f73e48a4..4cd5925f24 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java @@ -401,7 +401,7 @@ private void handleProgress(ApplicationAttemptId appAttemptId, // Send the status update to the appAttempt. getRmContext().getDispatcher().getEventHandler().handle( new RMAppAttemptStatusupdateEvent(appAttemptId, request - .getProgress())); + .getProgress(), request.getTrackingUrl())); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 32f275f807..3ec9c49818 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -1823,6 +1823,26 @@ public void transition(RMAppAttemptImpl appAttempt, // Update progress appAttempt.progress = statusUpdateEvent.getProgress(); + // Update tracking url if changed and save it to state store + String newTrackingUrl = statusUpdateEvent.getTrackingUrl(); + if (newTrackingUrl != null && + !newTrackingUrl.equals(appAttempt.originalTrackingUrl)) { + appAttempt.originalTrackingUrl = newTrackingUrl; + ApplicationAttemptStateData attemptState = ApplicationAttemptStateData + .newInstance(appAttempt.applicationAttemptId, + appAttempt.getMasterContainer(), + appAttempt.rmContext.getStateStore() + .getCredentialsFromAppAttempt(appAttempt), + appAttempt.startTime, appAttempt.recoveredFinalState, + newTrackingUrl, appAttempt.getDiagnostics(), null, + ContainerExitStatus.INVALID, appAttempt.getFinishTime(), + appAttempt.attemptMetrics.getAggregateAppResourceUsage() + .getResourceUsageSecondsMap(), + appAttempt.attemptMetrics.getPreemptedResourceSecondsMap()); + appAttempt.rmContext.getStateStore() + .updateApplicationAttemptState(attemptState); + } + // Ping to AMLivelinessMonitor appAttempt.rmContext.getAMLivelinessMonitor().receivedPing( statusUpdateEvent.getApplicationAttemptId()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptStatusupdateEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptStatusupdateEvent.java index b1b63b14ef..1b7442d8a3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptStatusupdateEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptStatusupdateEvent.java @@ -25,15 +25,26 @@ public class RMAppAttemptStatusupdateEvent extends RMAppAttemptEvent { private final float progress; + private final String trackingUrl; public RMAppAttemptStatusupdateEvent(ApplicationAttemptId appAttemptId, float progress) { + this(appAttemptId, progress, null); + } + + public RMAppAttemptStatusupdateEvent(ApplicationAttemptId appAttemptId, + float progress, String trackingUrl) { super(appAttemptId, RMAppAttemptEventType.STATUS_UPDATE); this.progress = progress; + this.trackingUrl = trackingUrl; } public float getProgress() { return this.progress; } + public String getTrackingUrl() { + return this.trackingUrl; + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java index 9696741f1e..562ba5d506 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java @@ -956,4 +956,38 @@ private void sentRMContainerLaunched(MockRM rm, ContainerId containerId) { fail("Cannot find RMContainer"); } } + + @Test(timeout = 300000) + public void testUpdateTrackingUrl() throws Exception { + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + MockRM rm = new MockRM(conf); + rm.start(); + + // Register node1 + MockNM nm1 = rm.registerNode("127.0.0.1:1234", 6 * GB); + + RMApp app1 = rm.submitApp(2048); + + nm1.nodeHeartbeat(true); + RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); + MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId()); + am1.registerAppAttempt(); + Assert.assertEquals("N/A", rm.getRMContext().getRMApps().get( + app1.getApplicationId()).getOriginalTrackingUrl()); + + AllocateRequestPBImpl allocateRequest = new AllocateRequestPBImpl(); + String newTrackingUrl = "hadoop.apache.org"; + allocateRequest.setTrackingUrl(newTrackingUrl); + + am1.allocate(allocateRequest); + Assert.assertEquals(newTrackingUrl, rm.getRMContext().getRMApps().get( + app1.getApplicationId()).getOriginalTrackingUrl()); + + // Send it again + am1.allocate(allocateRequest); + Assert.assertEquals(newTrackingUrl, rm.getRMContext().getRMApps().get( + app1.getApplicationId()).getOriginalTrackingUrl()); + rm.stop(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index 07c5268f53..9aa5c531ae 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -2698,6 +2698,51 @@ public void testRMRestartAfterPriorityChangesInAllocatedResponse() rm2.stop(); } + @Test(timeout = 20000) + public void testRMRestartAfterUpdateTrackingUrl() throws Exception { + MockRM rm = new MockRM(conf); + rm.start(); + + MemoryRMStateStore memStore = (MemoryRMStateStore) rm.getRMStateStore(); + + // Register node1 + MockNM nm1 = rm.registerNode("127.0.0.1:1234", 6 * 1024); + + RMApp app1 = rm.submitApp(2048); + + nm1.nodeHeartbeat(true); + RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); + MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId()); + am1.registerAppAttempt(); + + AllocateRequestPBImpl allocateRequest = new AllocateRequestPBImpl(); + String newTrackingUrl = "hadoop.apache.org"; + allocateRequest.setTrackingUrl(newTrackingUrl); + + am1.allocate(allocateRequest); + // Check in-memory and stored tracking url + Assert.assertEquals(newTrackingUrl, rm.getRMContext().getRMApps().get( + app1.getApplicationId()).getOriginalTrackingUrl()); + Assert.assertEquals(newTrackingUrl, rm.getRMContext().getRMApps().get( + app1.getApplicationId()).getCurrentAppAttempt() + .getOriginalTrackingUrl()); + Assert.assertEquals(newTrackingUrl, memStore.getState() + .getApplicationState().get(app1.getApplicationId()) + .getAttempt(attempt1.getAppAttemptId()).getFinalTrackingUrl()); + + // Start new RM, should recover updated tracking url + MockRM rm2 = new MockRM(conf, memStore); + rm2.start(); + Assert.assertEquals(newTrackingUrl, rm.getRMContext().getRMApps().get( + app1.getApplicationId()).getOriginalTrackingUrl()); + Assert.assertEquals(newTrackingUrl, rm.getRMContext().getRMApps().get( + app1.getApplicationId()).getCurrentAppAttempt() + .getOriginalTrackingUrl()); + + rm.stop(); + rm2.stop(); + } + private Credentials getCreds() throws IOException { Credentials ts = new Credentials(); DataOutputBuffer dob = new DataOutputBuffer();