diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterClientRMService.java index 73cc18558d..bbb8047d98 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterClientRMService.java @@ -430,13 +430,15 @@ public GetAllResourceTypeInfoResponse getResourceTypeInfo( return pipeline.getRootInterceptor().getResourceTypeInfo(request); } - private RequestInterceptorChainWrapper getInterceptorChain() + @VisibleForTesting + protected RequestInterceptorChainWrapper getInterceptorChain() throws IOException { String user = UserGroupInformation.getCurrentUser().getUserName(); - if (!userPipelineMap.containsKey(user)) { - initializePipeline(user); + RequestInterceptorChainWrapper chain = userPipelineMap.get(user); + if (chain != null && chain.getRootInterceptor() != null) { + return chain; } - return userPipelineMap.get(user); + return initializePipeline(user); } /** @@ -503,36 +505,33 @@ protected ClientRequestInterceptor createRequestInterceptorChain() { * * @param user */ - private void initializePipeline(String user) { - RequestInterceptorChainWrapper chainWrapper = null; + private RequestInterceptorChainWrapper initializePipeline(String user) { synchronized (this.userPipelineMap) { if (this.userPipelineMap.containsKey(user)) { LOG.info("Request to start an already existing user: {}" + " was received, so ignoring.", user); - return; + return userPipelineMap.get(user); + } + + RequestInterceptorChainWrapper chainWrapper = + new RequestInterceptorChainWrapper(); + try { + // We should init the pipeline instance after it is created and then + // add to the map, to ensure thread safe. + LOG.info("Initializing request processing pipeline for application " + + "for the user: {}", user); + + ClientRequestInterceptor interceptorChain = + this.createRequestInterceptorChain(); + interceptorChain.init(user); + chainWrapper.init(interceptorChain); + } catch (Exception e) { + LOG.error("Init ClientRequestInterceptor error for user: " + user, e); + throw e; } - chainWrapper = new RequestInterceptorChainWrapper(); this.userPipelineMap.put(user, chainWrapper); - } - - // We register the pipeline instance in the map first and then initialize it - // later because chain initialization can be expensive and we would like to - // release the lock as soon as possible to prevent other applications from - // blocking when one application's chain is initializing - LOG.info("Initializing request processing pipeline for application " - + "for the user: {}", user); - - try { - ClientRequestInterceptor interceptorChain = - this.createRequestInterceptorChain(); - interceptorChain.init(user); - chainWrapper.init(interceptorChain); - } catch (Exception e) { - synchronized (this.userPipelineMap) { - this.userPipelineMap.remove(user); - } - throw e; + return chainWrapper; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RouterRMAdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RouterRMAdminService.java index b8b7ad818f..ef30613f50 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RouterRMAdminService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RouterRMAdminService.java @@ -165,13 +165,15 @@ private List getInterceptorClassNames(Configuration conf) { return interceptorClassNames; } - private RequestInterceptorChainWrapper getInterceptorChain() + @VisibleForTesting + protected RequestInterceptorChainWrapper getInterceptorChain() throws IOException { String user = UserGroupInformation.getCurrentUser().getUserName(); - if (!userPipelineMap.containsKey(user)) { - initializePipeline(user); + RequestInterceptorChainWrapper chain = userPipelineMap.get(user); + if (chain != null && chain.getRootInterceptor() != null) { + return chain; } - return userPipelineMap.get(user); + return initializePipeline(user); } /** @@ -239,35 +241,32 @@ protected RMAdminRequestInterceptor createRequestInterceptorChain() { * * @param user */ - private void initializePipeline(String user) { - RequestInterceptorChainWrapper chainWrapper = null; + private RequestInterceptorChainWrapper initializePipeline(String user) { synchronized (this.userPipelineMap) { if (this.userPipelineMap.containsKey(user)) { LOG.info("Request to start an already existing user: {}" + " was received, so ignoring.", user); - return; + return userPipelineMap.get(user); + } + + RequestInterceptorChainWrapper chainWrapper = + new RequestInterceptorChainWrapper(); + try { + // We should init the pipeline instance after it is created and then + // add to the map, to ensure thread safe. + LOG.info("Initializing request processing pipeline for user: {}", user); + + RMAdminRequestInterceptor interceptorChain = + this.createRequestInterceptorChain(); + interceptorChain.init(user); + chainWrapper.init(interceptorChain); + } catch (Exception e) { + LOG.error("Init RMAdminRequestInterceptor error for user: " + user, e); + throw e; } - chainWrapper = new RequestInterceptorChainWrapper(); this.userPipelineMap.put(user, chainWrapper); - } - - // We register the pipeline instance in the map first and then initialize it - // later because chain initialization can be expensive and we would like to - // release the lock as soon as possible to prevent other applications from - // blocking when one application's chain is initializing - LOG.info("Initializing request processing pipeline for the user: {}", user); - - try { - RMAdminRequestInterceptor interceptorChain = - this.createRequestInterceptorChain(); - interceptorChain.init(user); - chainWrapper.init(interceptorChain); - } catch (Exception e) { - synchronized (this.userPipelineMap) { - this.userPipelineMap.remove(user); - } - throw e; + return chainWrapper; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServices.java index ae57f1cdc8..49de588ba8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServices.java @@ -173,10 +173,11 @@ protected RequestInterceptorChainWrapper getInterceptorChain( } catch (IOException e) { LOG.error("Cannot get user: {}", e.getMessage()); } - if (!userPipelineMap.containsKey(user)) { - initializePipeline(user); + RequestInterceptorChainWrapper chain = userPipelineMap.get(user); + if (chain != null && chain.getRootInterceptor() != null) { + return chain; } - return userPipelineMap.get(user); + return initializePipeline(user); } /** @@ -242,35 +243,32 @@ protected RESTRequestInterceptor createRequestInterceptorChain() { * * @param user */ - private void initializePipeline(String user) { - RequestInterceptorChainWrapper chainWrapper = null; + private RequestInterceptorChainWrapper initializePipeline(String user) { synchronized (this.userPipelineMap) { if (this.userPipelineMap.containsKey(user)) { LOG.info("Request to start an already existing user: {}" + " was received, so ignoring.", user); - return; + return userPipelineMap.get(user); + } + + RequestInterceptorChainWrapper chainWrapper = + new RequestInterceptorChainWrapper(); + try { + // We should init the pipeline instance after it is created and then + // add to the map, to ensure thread safe. + LOG.info("Initializing request processing pipeline for user: {}", user); + + RESTRequestInterceptor interceptorChain = + this.createRequestInterceptorChain(); + interceptorChain.init(user); + chainWrapper.init(interceptorChain); + } catch (Exception e) { + LOG.error("Init RESTRequestInterceptor error for user: " + user, e); + throw e; } - chainWrapper = new RequestInterceptorChainWrapper(); this.userPipelineMap.put(user, chainWrapper); - } - - // We register the pipeline instance in the map first and then initialize it - // later because chain initialization can be expensive and we would like to - // release the lock as soon as possible to prevent other applications from - // blocking when one application's chain is initializing - LOG.info("Initializing request processing pipeline for the user: {}", user); - - try { - RESTRequestInterceptor interceptorChain = - this.createRequestInterceptorChain(); - interceptorChain.init(user); - chainWrapper.init(interceptorChain); - } catch (Exception e) { - synchronized (this.userPipelineMap) { - this.userPipelineMap.remove(user); - } - throw e; + return chainWrapper; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestRouterClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestRouterClientRMService.java index a9c37293f6..b03059decf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestRouterClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestRouterClientRMService.java @@ -19,8 +19,10 @@ package org.apache.hadoop.yarn.server.router.clientrm; import java.io.IOException; +import java.security.PrivilegedExceptionAction; import java.util.Map; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse; @@ -207,4 +209,62 @@ public void testUsersChainMapWithLRUCache() Assert.assertNull("test2 should have been evicted", chain); } + /** + * This test validates if the ClientRequestInterceptor chain for the user + * can build and init correctly when a multi-client process begins to + * request RouterClientRMService for the same user simultaneously. + */ + @Test + public void testClientPipelineConcurrent() throws InterruptedException { + final String user = "test1"; + + /* + * ClientTestThread is a thread to simulate a client request to get a + * ClientRequestInterceptor for the user. + */ + class ClientTestThread extends Thread { + private ClientRequestInterceptor interceptor; + @Override public void run() { + try { + interceptor = pipeline(); + } catch (IOException | InterruptedException e) { + e.printStackTrace(); + } + } + private ClientRequestInterceptor pipeline() + throws IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user).doAs( + new PrivilegedExceptionAction() { + @Override + public ClientRequestInterceptor run() throws Exception { + RequestInterceptorChainWrapper wrapper = + getRouterClientRMService().getInterceptorChain(); + ClientRequestInterceptor interceptor = + wrapper.getRootInterceptor(); + Assert.assertNotNull(interceptor); + LOG.info("init client interceptor success for user " + user); + return interceptor; + } + }); + } + } + + /* + * We start the first thread. It should not finish initing a chainWrapper + * before the other thread starts. In this way, the second thread can + * init at the same time of the first one. In the end, we validate that + * the 2 threads get the same chainWrapper without going into error. + */ + ClientTestThread client1 = new ClientTestThread(); + ClientTestThread client2 = new ClientTestThread(); + client1.start(); + client2.start(); + client1.join(); + client2.join(); + + Assert.assertNotNull(client1.interceptor); + Assert.assertNotNull(client2.interceptor); + Assert.assertTrue(client1.interceptor == client2.interceptor); + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestRouterRMAdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestRouterRMAdminService.java index 11786e6f98..07ef73c3cd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestRouterRMAdminService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestRouterRMAdminService.java @@ -19,8 +19,10 @@ package org.apache.hadoop.yarn.server.router.rmadmin; import java.io.IOException; +import java.security.PrivilegedExceptionAction; import java.util.Map; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse; @@ -216,4 +218,62 @@ public void testUsersChainMapWithLRUCache() Assert.assertNull("test2 should have been evicted", chain); } + /** + * This test validates if the RMAdminRequestInterceptor chain for the user + * can build and init correctly when a multi-client process begins to + * request RouterRMAdminService for the same user simultaneously. + */ + @Test + public void testRMAdminPipelineConcurrent() throws InterruptedException { + final String user = "test1"; + + /* + * ClientTestThread is a thread to simulate a client request to get a + * RMAdminRequestInterceptor for the user. + */ + class ClientTestThread extends Thread { + private RMAdminRequestInterceptor interceptor; + @Override public void run() { + try { + interceptor = pipeline(); + } catch (IOException | InterruptedException e) { + e.printStackTrace(); + } + } + private RMAdminRequestInterceptor pipeline() + throws IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user).doAs( + new PrivilegedExceptionAction() { + @Override + public RMAdminRequestInterceptor run() throws Exception { + RequestInterceptorChainWrapper wrapper = + getRouterRMAdminService().getInterceptorChain(); + RMAdminRequestInterceptor interceptor = + wrapper.getRootInterceptor(); + Assert.assertNotNull(interceptor); + LOG.info("init rm admin interceptor success for user" + user); + return interceptor; + } + }); + } + } + + /* + * We start the first thread. It should not finish initing a chainWrapper + * before the other thread starts. In this way, the second thread can + * init at the same time of the first one. In the end, we validate that + * the 2 threads get the same chainWrapper without going into error. + */ + ClientTestThread client1 = new ClientTestThread(); + ClientTestThread client2 = new ClientTestThread(); + client1.start(); + client2.start(); + client1.join(); + client2.join(); + + Assert.assertNotNull(client1.interceptor); + Assert.assertNotNull(client2.interceptor); + Assert.assertTrue(client1.interceptor == client2.interceptor); + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestRouterWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestRouterWebServices.java index c96575c21a..14652435da 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestRouterWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestRouterWebServices.java @@ -19,10 +19,12 @@ package org.apache.hadoop.yarn.server.router.webapp; import java.io.IOException; +import java.security.PrivilegedExceptionAction; import java.util.Map; import javax.ws.rs.core.Response; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ActivitiesInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo; @@ -49,12 +51,17 @@ import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo; import org.junit.Assert; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Test class to validate the WebService interceptor model inside the Router. */ public class TestRouterWebServices extends BaseRouterWebServicesTest { + private static final Logger LOG = + LoggerFactory.getLogger(TestRouterWebServices.class); + private String user = "test1"; /** @@ -266,4 +273,62 @@ public void testUsersChainMapWithLRUCache() Assert.assertNull("test2 should have been evicted", chain); } + /** + * This test validates if the RESTRequestInterceptor chain for the user + * can build and init correctly when a multi-client process begins to + * request RouterWebServices for the same user simultaneously. + */ + @Test + public void testWebPipelineConcurrent() throws InterruptedException { + final String user = "test1"; + + /* + * ClientTestThread is a thread to simulate a client request to get a + * RESTRequestInterceptor for the user. + */ + class ClientTestThread extends Thread { + private RESTRequestInterceptor interceptor; + @Override public void run() { + try { + interceptor = pipeline(); + } catch (IOException | InterruptedException e) { + e.printStackTrace(); + } + } + private RESTRequestInterceptor pipeline() + throws IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user).doAs( + new PrivilegedExceptionAction() { + @Override + public RESTRequestInterceptor run() throws Exception { + RequestInterceptorChainWrapper wrapper = + getInterceptorChain(user); + RESTRequestInterceptor interceptor = + wrapper.getRootInterceptor(); + Assert.assertNotNull(interceptor); + LOG.info("init web interceptor success for user" + user); + return interceptor; + } + }); + } + } + + /* + * We start the first thread. It should not finish initing a chainWrapper + * before the other thread starts. In this way, the second thread can + * init at the same time of the first one. In the end, we validate that + * the 2 threads get the same chainWrapper without going into error. + */ + ClientTestThread client1 = new ClientTestThread(); + ClientTestThread client2 = new ClientTestThread(); + client1.start(); + client2.start(); + client1.join(); + client2.join(); + + Assert.assertNotNull(client1.interceptor); + Assert.assertNotNull(client2.interceptor); + Assert.assertTrue(client1.interceptor == client2.interceptor); + } + }