From cd9ff27ffc9369820d0c39200a11bf00e6a767c8 Mon Sep 17 00:00:00 2001 From: Subru Krishnan Date: Mon, 8 May 2017 16:41:30 -0700 Subject: [PATCH] YARN-6234. Support multiple attempts on the node when AMRMProxy is enabled. (Giovanni Matteo Fumarola via Subru). --- .../amrmproxy/AMRMProxyService.java | 32 ++++++++++++++--- .../amrmproxy/TestAMRMProxyService.java | 36 +++++++++++++++++++ 2 files changed, 63 insertions(+), 5 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java index 9f2d9a1b1a..2696bca6c4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java @@ -270,18 +270,40 @@ public void processApplicationStartRequest(StartContainerRequest request) * @param user * @param amrmToken */ - protected void initializePipeline( - ApplicationAttemptId applicationAttemptId, String user, - Token amrmToken, + protected void initializePipeline(ApplicationAttemptId applicationAttemptId, + String user, Token amrmToken, Token localToken) { RequestInterceptorChainWrapper chainWrapper = null; synchronized (applPipelineMap) { - if (applPipelineMap.containsKey(applicationAttemptId.getApplicationId())) { + if (applPipelineMap + .containsKey(applicationAttemptId.getApplicationId())) { LOG.warn("Request to start an already existing appId was received. " + " This can happen if an application failed and a new attempt " + "was created on this machine. ApplicationId: " + applicationAttemptId.toString()); - return; + + RequestInterceptorChainWrapper chainWrapperBackup = + this.applPipelineMap.get(applicationAttemptId.getApplicationId()); + if (chainWrapperBackup != null + && chainWrapperBackup.getApplicationAttemptId() != null + && !chainWrapperBackup.getApplicationAttemptId() + .equals(applicationAttemptId)) { + // Remove the existing pipeline + LOG.info("Remove the previous pipeline for ApplicationId: " + + applicationAttemptId.toString()); + RequestInterceptorChainWrapper pipeline = + applPipelineMap.remove(applicationAttemptId.getApplicationId()); + try { + pipeline.getRootInterceptor().shutdown(); + } catch (Throwable ex) { + LOG.warn( + "Failed to shutdown the request processing pipeline for app:" + + applicationAttemptId.getApplicationId(), + ex); + } + } else { + return; + } } chainWrapper = new RequestInterceptorChainWrapper(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java index 7fffddf683..837278c234 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; +import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -27,10 +28,14 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyService.RequestInterceptorChainWrapper; import org.apache.hadoop.yarn.util.Records; import org.junit.Assert; import org.junit.Test; @@ -380,6 +385,37 @@ public Integer invoke(Integer testAppId) { } } + @Test + public void testMultipleAttemptsSameNode() + throws YarnException, IOException, Exception { + + String user = "hadoop"; + ApplicationId appId = ApplicationId.newInstance(1, 1); + ApplicationAttemptId applicationAttemptId; + + // First Attempt + + RegisterApplicationMasterResponse response1 = + registerApplicationMaster(appId.getId()); + Assert.assertNotNull(response1); + + AllocateResponse allocateResponse = allocate(appId.getId()); + Assert.assertNotNull(allocateResponse); + + // Second Attempt + + applicationAttemptId = ApplicationAttemptId.newInstance(appId, 2); + getAMRMProxyService().initializePipeline(applicationAttemptId, user, null, + null); + + RequestInterceptorChainWrapper chain2 = + getAMRMProxyService().getPipelines().get(appId); + Assert.assertEquals(applicationAttemptId, chain2.getApplicationAttemptId()); + + allocateResponse = allocate(appId.getId()); + Assert.assertNotNull(allocateResponse); + } + private List getContainersAndAssert(int appId, int numberOfResourceRequests) throws Exception { AllocateRequest allocateRequest =