diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index 15bd1fa1d5..7301e90abf 100755 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -322,6 +322,13 @@ ${project.version} + + org.apache.hadoop + hadoop-yarn-server-common + ${project.version} + test-jar + + org.apache.hadoop hadoop-yarn-server-tests diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 7bcb12357d..cf9c237997 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -2637,6 +2637,27 @@ public static boolean isAclEnabled(Configuration conf) { public static final int DEFAULT_FEDERATION_STATESTORE_SQL_MAXCONNECTIONS = 1; + public static final String ROUTER_PREFIX = YARN_PREFIX + "router."; + + public static final String ROUTER_CLIENTRM_PREFIX = + ROUTER_PREFIX + "clientrm."; + + public static final String ROUTER_CLIENTRM_ADDRESS = + ROUTER_CLIENTRM_PREFIX + ".address"; + public static final int DEFAULT_ROUTER_CLIENTRM_PORT = 8050; + public static final String DEFAULT_ROUTER_CLIENTRM_ADDRESS = + "0.0.0.0:" + DEFAULT_ROUTER_CLIENTRM_PORT; + + public static final String ROUTER_CLIENTRM_INTERCEPTOR_CLASS_PIPELINE = + ROUTER_CLIENTRM_PREFIX + "interceptor-class.pipeline"; + public static final String DEFAULT_ROUTER_CLIENTRM_INTERCEPTOR_CLASS = + "org.apache.hadoop.yarn.server.router.clientrm." + + "DefaultClientRequestInterceptor"; + + public static final String ROUTER_CLIENTRM_PIPELINE_CACHE_MAX_SIZE = + ROUTER_CLIENTRM_PREFIX + "cache-max-size"; + public static final int DEFAULT_ROUTER_CLIENTRM_PIPELINE_CACHE_MAX_SIZE = 25; + //////////////////////////////// // Other Configs //////////////////////////////// diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/LRUCacheHashMap.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/LRUCacheHashMap.java new file mode 100644 index 0000000000..7cb4e1b501 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/LRUCacheHashMap.java @@ -0,0 +1,49 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.util; + +import java.util.LinkedHashMap; +import java.util.Map; + +/** + * LRU cache with a configurable maximum cache size and access order. + */ +public class LRUCacheHashMap extends LinkedHashMap { + + private static final long serialVersionUID = 1L; + + // Maximum size of the cache + private int maxSize; + + /** + * Constructor. + * + * @param maxSize max size of the cache + * @param accessOrder true for access-order, false for insertion-order + */ + public LRUCacheHashMap(int maxSize, boolean accessOrder) { + super(maxSize, 0.75f, accessOrder); + this.maxSize = maxSize; + } + + @Override + protected boolean removeEldestEntry(Map.Entry eldest) { + return size() > maxSize; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 6af7321ad8..94dccd1fc5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -3165,6 +3165,24 @@ false + + + The comma separated list of class names that implement the + RequestInterceptor interface. This is used by the RouterClientRMService + to create the request processing pipeline for users. + + yarn.router.clientrm.interceptor-class.pipeline + org.apache.hadoop.yarn.server.router.clientrm.DefaultClientRequestInterceptor + + + + + Size of LRU cache for Router ClientRM Service. + + yarn.router.clientrm.cache-max-size + 25 + + Comma-separated list of PlacementRules to determine how applications diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestLRUCacheHashMap.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestLRUCacheHashMap.java new file mode 100644 index 0000000000..1cbb56c732 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestLRUCacheHashMap.java @@ -0,0 +1,74 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.yarn.util; + +import java.io.IOException; + +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.junit.Assert; +import org.junit.Test; + +/** + * Test class to validate the correctness of the LRUCacheHashMap. + * + */ +public class TestLRUCacheHashMap { + + /** + * Test if the different entries are generated, and LRU cache is working as + * expected. + */ + @Test + public void testLRUCache() + throws YarnException, IOException, InterruptedException { + + int mapSize = 5; + + LRUCacheHashMap map = + new LRUCacheHashMap(mapSize, true); + + map.put("1", 1); + map.put("2", 2); + map.put("3", 3); + map.put("4", 4); + map.put("5", 5); + + Assert.assertEquals(mapSize, map.size()); + + // Check if all the elements in the map are from 1 to 5 + for (int i = 1; i < mapSize; i++) { + Assert.assertTrue(map.containsKey(Integer.toString(i))); + } + + map.put("6", 6); + map.put("3", 3); + map.put("7", 7); + map.put("8", 8); + + Assert.assertEquals(mapSize, map.size()); + + // Check if all the elements in the map are from 5 to 8 and the 3 + for (int i = 5; i < mapSize; i++) { + Assert.assertTrue(map.containsKey(Integer.toString(i))); + } + + Assert.assertTrue(map.containsKey("3")); + + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml index c9f6d79683..5f8509764e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml @@ -209,6 +209,17 @@ + + maven-jar-plugin + + + + test-jar + + test-compile + + + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java similarity index 76% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java index f584c94f7f..e302c70be3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; +package org.apache.hadoop.yarn.server; import java.io.IOException; import java.util.ArrayList; @@ -24,11 +24,12 @@ import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; -import com.google.common.base.Strings; -import org.apache.commons.lang.NotImplementedException; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; @@ -93,8 +94,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse; -import org.apache.hadoop.yarn.api.ApplicationClientProtocol; -import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerResponsePBImpl; import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; @@ -106,7 +106,10 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.ReservationAllocationState; +import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.UpdatedContainer; @@ -116,16 +119,22 @@ import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.util.Records; import org.junit.Assert; -import org.eclipse.jetty.util.log.Log; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Strings; /** * Mock Resource Manager facade implementation that exposes all the methods * implemented by the YARN RM. The behavior and the values returned by this mock - * implementation is expected by the unit test cases. So please change the - * implementation with care. + * implementation is expected by the Router/AMRMProxy unit test cases. So please + * change the implementation with care. */ -public class MockResourceManagerFacade implements - ApplicationMasterProtocol, ApplicationClientProtocol { +public class MockResourceManagerFacade + implements ApplicationClientProtocol, ApplicationMasterProtocol { + + private static final Logger LOG = + LoggerFactory.getLogger(MockResourceManagerFacade.class); private HashMap> applicationContainerIdMap = new HashMap>(); @@ -150,49 +159,48 @@ private static String getAppIdentifier() throws IOException { break; } } - return result != null ? result.getApplicationAttemptId().toString() - : ""; + return result != null ? result.getApplicationAttemptId().toString() : ""; } @Override public RegisterApplicationMasterResponse registerApplicationMaster( - RegisterApplicationMasterRequest request) throws YarnException, - IOException { + RegisterApplicationMasterRequest request) + throws YarnException, IOException { String amrmToken = getAppIdentifier(); - Log.getLog().info("Registering application attempt: " + amrmToken); + LOG.info("Registering application attempt: " + amrmToken); synchronized (applicationContainerIdMap) { - Assert.assertFalse("The application id is already registered: " - + amrmToken, applicationContainerIdMap.containsKey(amrmToken)); + Assert.assertFalse( + "The application id is already registered: " + amrmToken, + applicationContainerIdMap.containsKey(amrmToken)); // Keep track of the containers that are returned to this application - applicationContainerIdMap.put(amrmToken, - new ArrayList()); + applicationContainerIdMap.put(amrmToken, new ArrayList()); } - return RegisterApplicationMasterResponse.newInstance(null, null, null, - null, null, request.getHost(), null); + return RegisterApplicationMasterResponse.newInstance(null, null, null, null, + null, request.getHost(), null); } @Override public FinishApplicationMasterResponse finishApplicationMaster( - FinishApplicationMasterRequest request) throws YarnException, - IOException { + FinishApplicationMasterRequest request) + throws YarnException, IOException { String amrmToken = getAppIdentifier(); - Log.getLog().info("Finishing application attempt: " + amrmToken); + LOG.info("Finishing application attempt: " + amrmToken); synchronized (applicationContainerIdMap) { // Remove the containers that were being tracked for this application - Assert.assertTrue("The application id is NOT registered: " - + amrmToken, applicationContainerIdMap.containsKey(amrmToken)); + Assert.assertTrue("The application id is NOT registered: " + amrmToken, + applicationContainerIdMap.containsKey(amrmToken)); List ids = applicationContainerIdMap.remove(amrmToken); for (ContainerId c : ids) { allocatedContainerMap.remove(c); } } - return FinishApplicationMasterResponse - .newInstance(request.getFinalApplicationStatus() == FinalApplicationStatus.SUCCEEDED ? true - : false); + return FinishApplicationMasterResponse.newInstance( + request.getFinalApplicationStatus() == FinalApplicationStatus.SUCCEEDED + ? true : false); } protected ApplicationId getApplicationId(int id) { @@ -220,9 +228,8 @@ public AllocateResponse allocate(AllocateRequest request) if (request.getAskList() != null) { for (ResourceRequest rr : request.getAskList()) { for (int i = 0; i < rr.getNumContainers(); i++) { - ContainerId containerId = - ContainerId.newInstance(getApplicationAttemptId(1), - containerIndex.incrementAndGet()); + ContainerId containerId = ContainerId.newInstance( + getApplicationAttemptId(1), containerIndex.incrementAndGet()); Container container = Records.newRecord(Container.class); container.setId(containerId); container.setPriority(rr.getPriority()); @@ -230,9 +237,8 @@ public AllocateResponse allocate(AllocateRequest request) // We don't use the node for running containers in the test cases. So // it is OK to hard code it to some dummy value NodeId nodeId = - NodeId.newInstance( - !Strings.isNullOrEmpty(rr.getResourceName()) ? rr - .getResourceName() : "dummy", 1000); + NodeId.newInstance(!Strings.isNullOrEmpty(rr.getResourceName()) + ? rr.getResourceName() : "dummy", 1000); container.setNodeId(nodeId); container.setResource(rr.getCapability()); containerList.add(container); @@ -244,8 +250,7 @@ public AllocateResponse allocate(AllocateRequest request) "The application id is Not registered before allocate(): " + amrmToken, applicationContainerIdMap.containsKey(amrmToken)); - List ids = - applicationContainerIdMap.get(amrmToken); + List ids = applicationContainerIdMap.get(amrmToken); ids.add(containerId); this.allocatedContainerMap.put(containerId, container); } @@ -255,13 +260,13 @@ public AllocateResponse allocate(AllocateRequest request) if (request.getReleaseList() != null && request.getReleaseList().size() > 0) { - Log.getLog().info("Releasing containers: " - + request.getReleaseList().size()); + LOG.info("Releasing containers: " + request.getReleaseList().size()); synchronized (applicationContainerIdMap) { - Assert.assertTrue( - "The application id is not registered before allocate(): " - + amrmToken, - applicationContainerIdMap.containsKey(amrmToken)); + Assert + .assertTrue( + "The application id is not registered before allocate(): " + + amrmToken, + applicationContainerIdMap.containsKey(amrmToken)); List ids = applicationContainerIdMap.get(amrmToken); for (ContainerId id : request.getReleaseList()) { @@ -273,10 +278,9 @@ public AllocateResponse allocate(AllocateRequest request) } } - Assert.assertTrue( - "ContainerId " + id - + " being released is not valid for application: " - + conf.get("AMRMTOKEN"), found); + Assert.assertTrue("ContainerId " + id + + " being released is not valid for application: " + + conf.get("AMRMTOKEN"), found); ids.remove(id); @@ -286,9 +290,8 @@ public AllocateResponse allocate(AllocateRequest request) // returning of fake containers is ONLY done for testing purpose - for // the test code to get confirmation that the sub-cluster resource // managers received the release request - ContainerId fakeContainerId = - ContainerId.newInstance(getApplicationAttemptId(1), - containerIndex.incrementAndGet()); + ContainerId fakeContainerId = ContainerId.newInstance( + getApplicationAttemptId(1), containerIndex.incrementAndGet()); Container fakeContainer = allocatedContainerMap.get(id); fakeContainer.setId(fakeContainerId); containerList.add(fakeContainer); @@ -296,46 +299,44 @@ public AllocateResponse allocate(AllocateRequest request) } } - Log.getLog().info("Allocating containers: " + containerList.size() + LOG.info("Allocating containers: " + containerList.size() + " for application attempt: " + conf.get("AMRMTOKEN")); // Always issue a new AMRMToken as if RM rolled master key Token newAMRMToken = Token.newInstance(new byte[0], "", new byte[0], ""); - return AllocateResponse.newInstance(0, - new ArrayList(), containerList, - new ArrayList(), null, AMCommand.AM_RESYNC, 1, null, - new ArrayList(), newAMRMToken, + return AllocateResponse.newInstance(0, new ArrayList(), + containerList, new ArrayList(), null, AMCommand.AM_RESYNC, + 1, null, new ArrayList(), newAMRMToken, new ArrayList()); } @Override public GetApplicationReportResponse getApplicationReport( - GetApplicationReportRequest request) throws YarnException, - IOException { + GetApplicationReportRequest request) throws YarnException, IOException { GetApplicationReportResponse response = Records.newRecord(GetApplicationReportResponse.class); ApplicationReport report = Records.newRecord(ApplicationReport.class); report.setYarnApplicationState(YarnApplicationState.ACCEPTED); report.setApplicationId(request.getApplicationId()); - report.setCurrentApplicationAttemptId(ApplicationAttemptId - .newInstance(request.getApplicationId(), 1)); + report.setCurrentApplicationAttemptId( + ApplicationAttemptId.newInstance(request.getApplicationId(), 1)); response.setApplicationReport(report); return response; } @Override public GetApplicationAttemptReportResponse getApplicationAttemptReport( - GetApplicationAttemptReportRequest request) throws YarnException, - IOException { + GetApplicationAttemptReportRequest request) + throws YarnException, IOException { + GetApplicationAttemptReportResponse response = Records.newRecord(GetApplicationAttemptReportResponse.class); ApplicationAttemptReport report = Records.newRecord(ApplicationAttemptReport.class); report.setApplicationAttemptId(request.getApplicationAttemptId()); - report - .setYarnApplicationAttemptState(YarnApplicationAttemptState.LAUNCHED); + report.setYarnApplicationAttemptState(YarnApplicationAttemptState.LAUNCHED); response.setApplicationAttemptReport(report); return response; } @@ -343,172 +344,168 @@ public GetApplicationAttemptReportResponse getApplicationAttemptReport( @Override public GetNewApplicationResponse getNewApplication( GetNewApplicationRequest request) throws YarnException, IOException { - return null; + return GetNewApplicationResponse.newInstance(null, null, null); } @Override public SubmitApplicationResponse submitApplication( SubmitApplicationRequest request) throws YarnException, IOException { - return null; + return SubmitApplicationResponse.newInstance(); } @Override public KillApplicationResponse forceKillApplication( KillApplicationRequest request) throws YarnException, IOException { - throw new NotImplementedException(); + return KillApplicationResponse.newInstance(true); } @Override public GetClusterMetricsResponse getClusterMetrics( GetClusterMetricsRequest request) throws YarnException, IOException { - throw new NotImplementedException(); + return GetClusterMetricsResponse.newInstance(null); } @Override - public GetApplicationsResponse getApplications( - GetApplicationsRequest request) throws YarnException, IOException { - throw new NotImplementedException(); + public GetApplicationsResponse getApplications(GetApplicationsRequest request) + throws YarnException, IOException { + return GetApplicationsResponse.newInstance(null); } @Override - public GetClusterNodesResponse getClusterNodes( - GetClusterNodesRequest request) throws YarnException, IOException { - throw new NotImplementedException(); + public GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest request) + throws YarnException, IOException { + return GetClusterNodesResponse.newInstance(null); } @Override public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request) throws YarnException, IOException { - throw new NotImplementedException(); + return GetQueueInfoResponse.newInstance(null); } @Override public GetQueueUserAclsInfoResponse getQueueUserAcls( - GetQueueUserAclsInfoRequest request) throws YarnException, - IOException { - throw new NotImplementedException(); + GetQueueUserAclsInfoRequest request) throws YarnException, IOException { + return GetQueueUserAclsInfoResponse.newInstance(null); } @Override public GetDelegationTokenResponse getDelegationToken( GetDelegationTokenRequest request) throws YarnException, IOException { - throw new NotImplementedException(); + return GetDelegationTokenResponse.newInstance(null); } @Override public RenewDelegationTokenResponse renewDelegationToken( - RenewDelegationTokenRequest request) throws YarnException, - IOException { - throw new NotImplementedException(); + RenewDelegationTokenRequest request) throws YarnException, IOException { + return RenewDelegationTokenResponse.newInstance(0); } @Override public CancelDelegationTokenResponse cancelDelegationToken( - CancelDelegationTokenRequest request) throws YarnException, - IOException { - throw new NotImplementedException(); + CancelDelegationTokenRequest request) throws YarnException, IOException { + return CancelDelegationTokenResponse.newInstance(); } @Override public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues( - MoveApplicationAcrossQueuesRequest request) throws YarnException, - IOException { - throw new NotImplementedException(); + MoveApplicationAcrossQueuesRequest request) + throws YarnException, IOException { + return MoveApplicationAcrossQueuesResponse.newInstance(); } @Override public GetApplicationAttemptsResponse getApplicationAttempts( - GetApplicationAttemptsRequest request) throws YarnException, - IOException { - throw new NotImplementedException(); + GetApplicationAttemptsRequest request) throws YarnException, IOException { + return GetApplicationAttemptsResponse.newInstance(null); } @Override public GetContainerReportResponse getContainerReport( GetContainerReportRequest request) throws YarnException, IOException { - throw new NotImplementedException(); + return GetContainerReportResponse.newInstance(null); } @Override public GetContainersResponse getContainers(GetContainersRequest request) throws YarnException, IOException { - throw new NotImplementedException(); - } - - @Override - public GetNewReservationResponse getNewReservation( - GetNewReservationRequest request) throws YarnException, IOException { - throw new NotImplementedException(); + return GetContainersResponse.newInstance(null); } @Override public ReservationSubmissionResponse submitReservation( - ReservationSubmissionRequest request) throws YarnException, - IOException { - throw new NotImplementedException(); + ReservationSubmissionRequest request) throws YarnException, IOException { + return ReservationSubmissionResponse.newInstance(); } @Override public ReservationListResponse listReservations( - ReservationListRequest request) throws YarnException, - IOException { - throw new NotImplementedException(); + ReservationListRequest request) throws YarnException, IOException { + return ReservationListResponse + .newInstance(new ArrayList()); } @Override public ReservationUpdateResponse updateReservation( ReservationUpdateRequest request) throws YarnException, IOException { - throw new NotImplementedException(); + return ReservationUpdateResponse.newInstance(); } @Override public ReservationDeleteResponse deleteReservation( ReservationDeleteRequest request) throws YarnException, IOException { - throw new NotImplementedException(); + return ReservationDeleteResponse.newInstance(); } @Override public GetNodesToLabelsResponse getNodeToLabels( GetNodesToLabelsRequest request) throws YarnException, IOException { - throw new NotImplementedException(); + return GetNodesToLabelsResponse + .newInstance(new HashMap>()); } @Override public GetClusterNodeLabelsResponse getClusterNodeLabels( - GetClusterNodeLabelsRequest request) throws YarnException, - IOException { - throw new NotImplementedException(); + GetClusterNodeLabelsRequest request) throws YarnException, IOException { + return GetClusterNodeLabelsResponse.newInstance(new ArrayList()); } @Override public GetLabelsToNodesResponse getLabelsToNodes( GetLabelsToNodesRequest request) throws YarnException, IOException { - return null; + return GetLabelsToNodesResponse.newInstance(null); } @Override - public UpdateApplicationPriorityResponse updateApplicationPriority( - UpdateApplicationPriorityRequest request) throws YarnException, - IOException { - return null; + public GetNewReservationResponse getNewReservation( + GetNewReservationRequest request) throws YarnException, IOException { + return GetNewReservationResponse + .newInstance(ReservationId.newInstance(0, 0)); } - @Override - public SignalContainerResponse signalToContainer( - SignalContainerRequest request) throws IOException { -return null; -} - @Override public FailApplicationAttemptResponse failApplicationAttempt( FailApplicationAttemptRequest request) throws YarnException, IOException { - throw new NotImplementedException(); + return FailApplicationAttemptResponse.newInstance(); + } + + @Override + public UpdateApplicationPriorityResponse updateApplicationPriority( + UpdateApplicationPriorityRequest request) + throws YarnException, IOException { + return UpdateApplicationPriorityResponse.newInstance(null); + } + + @Override + public SignalContainerResponse signalToContainer( + SignalContainerRequest request) throws YarnException, IOException { + return new SignalContainerResponsePBImpl(); } @Override public UpdateApplicationTimeoutsResponse updateApplicationTimeouts( UpdateApplicationTimeoutsRequest request) throws YarnException, IOException { - throw new NotImplementedException(); + return UpdateApplicationTimeoutsResponse.newInstance(); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml index 094519afe6..28ee0d96a9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml @@ -172,6 +172,13 @@ org.fusesource.leveldbjni leveldbjni-all + + + org.apache.hadoop + hadoop-yarn-server-common + test-jar + test + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockRequestInterceptor.java index c962f97a02..1cbb2378f4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockRequestInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockRequestInterceptor.java @@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.MockResourceManagerFacade; public class MockRequestInterceptor extends AbstractRequestInterceptor { @@ -38,22 +39,21 @@ public MockRequestInterceptor() { public void init(AMRMProxyApplicationContext appContext) { super.init(appContext); - mockRM = - new MockResourceManagerFacade(new YarnConfiguration( - super.getConf()), 0); + mockRM = new MockResourceManagerFacade( + new YarnConfiguration(super.getConf()), 0); } @Override public RegisterApplicationMasterResponse registerApplicationMaster( - RegisterApplicationMasterRequest request) throws YarnException, - IOException { + RegisterApplicationMasterRequest request) + throws YarnException, IOException { return mockRM.registerApplicationMaster(request); } @Override public FinishApplicationMasterResponse finishApplicationMaster( - FinishApplicationMasterRequest request) throws YarnException, - IOException { + FinishApplicationMasterRequest request) + throws YarnException, IOException { return mockRM.finishApplicationMaster(request); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/pom.xml index 25afa5c404..89813de2bc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/pom.xml @@ -48,14 +48,33 @@ hadoop-yarn-common + + org.apache.hadoop + hadoop-common + test-jar + test + + org.apache.hadoop hadoop-yarn-server-common + test-jar + test + + + + junit + junit + test + + org.apache.rat + apache-rat-plugin + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java index 7be8a596f7..7cfabf5af7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java @@ -18,6 +18,20 @@ package org.apache.hadoop.yarn.server.router; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.util.ShutdownHookManager; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.router.clientrm.RouterClientRMService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * The router is a stateless YARN component which is the entry point to the * cluster. It can be deployed on multiple nodes behind a Virtual IP (VIP) with @@ -33,6 +47,88 @@ * This provides a placeholder for throttling mis-behaving clients (YARN-1546) * and masks the access to multiple RMs (YARN-3659). */ -public class Router{ +public class Router extends CompositeService { + private static final Logger LOG = LoggerFactory.getLogger(Router.class); + private static CompositeServiceShutdownHook routerShutdownHook; + private Configuration conf; + private AtomicBoolean isStopping = new AtomicBoolean(false); + private RouterClientRMService clientRMProxyService; + + /** + * Priority of the Router shutdown hook. + */ + public static final int SHUTDOWN_HOOK_PRIORITY = 30; + + public Router() { + super(Router.class.getName()); + } + + protected void doSecureLogin() throws IOException { + // TODO YARN-6539 Create SecureLogin inside Router + } + + @Override + protected void serviceInit(Configuration config) throws Exception { + this.conf = config; + clientRMProxyService = createClientRMProxyService(); + addService(clientRMProxyService); + super.serviceInit(conf); + } + + @Override + protected void serviceStart() throws Exception { + try { + doSecureLogin(); + } catch (IOException e) { + throw new YarnRuntimeException("Failed Router login", e); + } + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + if (isStopping.getAndSet(true)) { + return; + } + super.serviceStop(); + } + + protected void shutDown() { + new Thread() { + @Override + public void run() { + Router.this.stop(); + } + }.start(); + } + + protected RouterClientRMService createClientRMProxyService() { + return new RouterClientRMService(); + } + + public static void main(String[] argv) { + Configuration conf = new YarnConfiguration(); + Thread + .setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler()); + StringUtils.startupShutdownMessage(Router.class, argv, LOG); + Router router = new Router(); + try { + + // Remove the old hook if we are rebooting. + if (null != routerShutdownHook) { + ShutdownHookManager.get().removeShutdownHook(routerShutdownHook); + } + + routerShutdownHook = new CompositeServiceShutdownHook(router); + ShutdownHookManager.get().addShutdownHook(routerShutdownHook, + SHUTDOWN_HOOK_PRIORITY); + + router.init(conf); + router.start(); + } catch (Throwable t) { + LOG.error("Error starting Router", t); + System.exit(-1); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/AbstractClientRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/AbstractClientRequestInterceptor.java new file mode 100644 index 0000000000..fc6a118da2 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/AbstractClientRequestInterceptor.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.router.clientrm; + +import org.apache.hadoop.conf.Configuration; + +/** + * Implements the RequestInterceptor interface and provides common functionality + * which can can be used and/or extended by other concrete intercepter classes. + * + */ +public abstract class AbstractClientRequestInterceptor + implements ClientRequestInterceptor { + private Configuration conf; + private ClientRequestInterceptor nextInterceptor; + + /** + * Sets the {@code RequestInterceptor} in the chain. + */ + @Override + public void setNextInterceptor(ClientRequestInterceptor nextInterceptor) { + this.nextInterceptor = nextInterceptor; + } + + /** + * Sets the {@link Configuration}. + */ + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + if (this.nextInterceptor != null) { + this.nextInterceptor.setConf(conf); + } + } + + /** + * Gets the {@link Configuration}. + */ + @Override + public Configuration getConf() { + return this.conf; + } + + /** + * Initializes the {@code ClientRequestInterceptor}. + */ + @Override + public void init(String user) { + if (this.nextInterceptor != null) { + this.nextInterceptor.init(user); + } + } + + /** + * Disposes the {@code ClientRequestInterceptor}. + */ + @Override + public void shutdown() { + if (this.nextInterceptor != null) { + this.nextInterceptor.shutdown(); + } + } + + /** + * Gets the next {@link ClientRequestInterceptor} in the chain. + */ + @Override + public ClientRequestInterceptor getNextInterceptor() { + return this.nextInterceptor; + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/ClientRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/ClientRequestInterceptor.java new file mode 100644 index 0000000000..2f8fb93634 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/ClientRequestInterceptor.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.router.clientrm; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; + +/** + * Defines the contract to be implemented by the request intercepter classes, + * that can be used to intercept and inspect messages sent from the client to + * the resource manager. + */ +public interface ClientRequestInterceptor + extends ApplicationClientProtocol, Configurable { + /** + * This method is called for initializing the intercepter. This is guaranteed + * to be called only once in the lifetime of this instance. + * + * @param user the name of the client + */ + void init(String user); + + /** + * This method is called to release the resources held by the intercepter. + * This will be called when the application pipeline is being destroyed. The + * concrete implementations should dispose the resources and forward the + * request to the next intercepter, if any. + */ + void shutdown(); + + /** + * Sets the next intercepter in the pipeline. The concrete implementation of + * this interface should always pass the request to the nextInterceptor after + * inspecting the message. The last intercepter in the chain is responsible to + * send the messages to the resource manager service and so the last + * intercepter will not receive this method call. + * + * @param nextInterceptor the ClientRequestInterceptor to set in the pipeline + */ + void setNextInterceptor(ClientRequestInterceptor nextInterceptor); + + /** + * Returns the next intercepter in the chain. + * + * @return the next intercepter in the chain + */ + ClientRequestInterceptor getNextInterceptor(); + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/DefaultClientRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/DefaultClientRequestInterceptor.java new file mode 100644 index 0000000000..12b933b848 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/DefaultClientRequestInterceptor.java @@ -0,0 +1,334 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.router.clientrm; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse; +import org.apache.hadoop.yarn.client.ClientRMProxy; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Extends the AbstractRequestInterceptorClient class and provides an + * implementation that simply forwards the client requests to the cluster + * resource manager. + * + */ +public class DefaultClientRequestInterceptor + extends AbstractClientRequestInterceptor { + private static final Logger LOG = + LoggerFactory.getLogger(DefaultClientRequestInterceptor.class); + private ApplicationClientProtocol clientRMProxy; + private UserGroupInformation user = null; + + @Override + public void init(String userName) { + super.init(userName); + try { + // Do not create a proxy user if user name matches the user name on + // current UGI + if (userName.equalsIgnoreCase( + UserGroupInformation.getCurrentUser().getUserName())) { + user = UserGroupInformation.getCurrentUser(); + } else { + user = UserGroupInformation.createProxyUser(userName, + UserGroupInformation.getCurrentUser()); + } + + final Configuration conf = this.getConf(); + + clientRMProxy = + user.doAs(new PrivilegedExceptionAction() { + @Override + public ApplicationClientProtocol run() throws Exception { + return ClientRMProxy.createRMProxy(conf, + ApplicationClientProtocol.class); + } + }); + } catch (IOException e) { + String message = "Error while creating Router ClientRM Service for user:"; + if (user != null) { + message += ", user: " + user; + } + + LOG.info(message); + throw new YarnRuntimeException(message, e); + } catch (Exception e) { + throw new YarnRuntimeException(e); + } + } + + @Override + public void setNextInterceptor(ClientRequestInterceptor next) { + throw new YarnRuntimeException( + "setNextInterceptor is being called on DefaultRequestInterceptor," + + "which should be the last one in the chain " + + "Check if the interceptor pipeline configuration is correct"); + } + + @Override + public GetNewApplicationResponse getNewApplication( + GetNewApplicationRequest request) throws YarnException, IOException { + return clientRMProxy.getNewApplication(request); + } + + @Override + public SubmitApplicationResponse submitApplication( + SubmitApplicationRequest request) throws YarnException, IOException { + return clientRMProxy.submitApplication(request); + } + + @Override + public KillApplicationResponse forceKillApplication( + KillApplicationRequest request) throws YarnException, IOException { + return clientRMProxy.forceKillApplication(request); + } + + @Override + public GetClusterMetricsResponse getClusterMetrics( + GetClusterMetricsRequest request) throws YarnException, IOException { + return clientRMProxy.getClusterMetrics(request); + } + + @Override + public GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest request) + throws YarnException, IOException { + return clientRMProxy.getClusterNodes(request); + } + + @Override + public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request) + throws YarnException, IOException { + return clientRMProxy.getQueueInfo(request); + } + + @Override + public GetQueueUserAclsInfoResponse getQueueUserAcls( + GetQueueUserAclsInfoRequest request) throws YarnException, IOException { + return clientRMProxy.getQueueUserAcls(request); + } + + @Override + public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues( + MoveApplicationAcrossQueuesRequest request) + throws YarnException, IOException { + return clientRMProxy.moveApplicationAcrossQueues(request); + } + + @Override + public GetNewReservationResponse getNewReservation( + GetNewReservationRequest request) throws YarnException, IOException { + return clientRMProxy.getNewReservation(request); + } + + @Override + public ReservationSubmissionResponse submitReservation( + ReservationSubmissionRequest request) throws YarnException, IOException { + return clientRMProxy.submitReservation(request); + } + + @Override + public ReservationListResponse listReservations( + ReservationListRequest request) throws YarnException, IOException { + return clientRMProxy.listReservations(request); + } + + @Override + public ReservationUpdateResponse updateReservation( + ReservationUpdateRequest request) throws YarnException, IOException { + return clientRMProxy.updateReservation(request); + } + + @Override + public ReservationDeleteResponse deleteReservation( + ReservationDeleteRequest request) throws YarnException, IOException { + return clientRMProxy.deleteReservation(request); + } + + @Override + public GetNodesToLabelsResponse getNodeToLabels( + GetNodesToLabelsRequest request) throws YarnException, IOException { + return clientRMProxy.getNodeToLabels(request); + } + + @Override + public GetLabelsToNodesResponse getLabelsToNodes( + GetLabelsToNodesRequest request) throws YarnException, IOException { + return clientRMProxy.getLabelsToNodes(request); + } + + @Override + public GetClusterNodeLabelsResponse getClusterNodeLabels( + GetClusterNodeLabelsRequest request) throws YarnException, IOException { + return clientRMProxy.getClusterNodeLabels(request); + } + + @Override + public GetApplicationReportResponse getApplicationReport( + GetApplicationReportRequest request) throws YarnException, IOException { + return clientRMProxy.getApplicationReport(request); + } + + @Override + public GetApplicationsResponse getApplications(GetApplicationsRequest request) + throws YarnException, IOException { + return clientRMProxy.getApplications(request); + } + + @Override + public GetApplicationAttemptReportResponse getApplicationAttemptReport( + GetApplicationAttemptReportRequest request) + throws YarnException, IOException { + return clientRMProxy.getApplicationAttemptReport(request); + } + + @Override + public GetApplicationAttemptsResponse getApplicationAttempts( + GetApplicationAttemptsRequest request) throws YarnException, IOException { + return clientRMProxy.getApplicationAttempts(request); + } + + @Override + public GetContainerReportResponse getContainerReport( + GetContainerReportRequest request) throws YarnException, IOException { + return clientRMProxy.getContainerReport(request); + } + + @Override + public GetContainersResponse getContainers(GetContainersRequest request) + throws YarnException, IOException { + return clientRMProxy.getContainers(request); + } + + @Override + public GetDelegationTokenResponse getDelegationToken( + GetDelegationTokenRequest request) throws YarnException, IOException { + return clientRMProxy.getDelegationToken(request); + } + + @Override + public RenewDelegationTokenResponse renewDelegationToken( + RenewDelegationTokenRequest request) throws YarnException, IOException { + return clientRMProxy.renewDelegationToken(request); + } + + @Override + public CancelDelegationTokenResponse cancelDelegationToken( + CancelDelegationTokenRequest request) throws YarnException, IOException { + return clientRMProxy.cancelDelegationToken(request); + } + + @Override + public FailApplicationAttemptResponse failApplicationAttempt( + FailApplicationAttemptRequest request) throws YarnException, IOException { + return clientRMProxy.failApplicationAttempt(request); + } + + @Override + public UpdateApplicationPriorityResponse updateApplicationPriority( + UpdateApplicationPriorityRequest request) + throws YarnException, IOException { + return clientRMProxy.updateApplicationPriority(request); + } + + @Override + public SignalContainerResponse signalToContainer( + SignalContainerRequest request) throws YarnException, IOException { + return clientRMProxy.signalToContainer(request); + } + + @Override + public UpdateApplicationTimeoutsResponse updateApplicationTimeouts( + UpdateApplicationTimeoutsRequest request) + throws YarnException, IOException { + return clientRMProxy.updateApplicationTimeouts(request); + } + + @VisibleForTesting + public void setRMClient(ApplicationClientProtocol clientRM) { + this.clientRMProxy = clientRM; + + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterClientRMService.java new file mode 100644 index 0000000000..00016dd7ae --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterClientRMService.java @@ -0,0 +1,544 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.router.clientrm; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.util.LRUCacheHashMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; + +/** + * RouterClientRMService is a service that runs on each router that can be used + * to intercept and inspect ApplicationClientProtocol messages from client to + * the cluster resource manager. It listens ApplicationClientProtocol messages + * from the client and creates a request intercepting pipeline instance for each + * client. The pipeline is a chain of intercepter instances that can inspect and + * modify the request/response as needed. The main difference with + * AMRMProxyService is the protocol they implement. + */ +public class RouterClientRMService extends AbstractService + implements ApplicationClientProtocol { + + private static final Logger LOG = + LoggerFactory.getLogger(RouterClientRMService.class); + + private Server server; + private InetSocketAddress listenerEndpoint; + + // For each user we store an interceptors' pipeline. + // For performance issue we use LRU cache to keep in memory the newest ones + // and remove the oldest used ones. + private Map userPipelineMap; + + public RouterClientRMService() { + super(RouterClientRMService.class.getName()); + } + + @Override + protected void serviceStart() throws Exception { + LOG.info("Starting Router ClientRMService"); + Configuration conf = getConfig(); + YarnRPC rpc = YarnRPC.create(conf); + UserGroupInformation.setConfiguration(conf); + + this.listenerEndpoint = + conf.getSocketAddr(YarnConfiguration.ROUTER_CLIENTRM_ADDRESS, + YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_ADDRESS, + YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_PORT); + + int maxCacheSize = + conf.getInt(YarnConfiguration.ROUTER_CLIENTRM_PIPELINE_CACHE_MAX_SIZE, + YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_PIPELINE_CACHE_MAX_SIZE); + this.userPipelineMap = Collections.synchronizedMap( + new LRUCacheHashMap( + maxCacheSize, true)); + + Configuration serverConf = new Configuration(conf); + + int numWorkerThreads = + serverConf.getInt(YarnConfiguration.RM_CLIENT_THREAD_COUNT, + YarnConfiguration.DEFAULT_RM_CLIENT_THREAD_COUNT); + + this.server = rpc.getServer(ApplicationClientProtocol.class, this, + listenerEndpoint, serverConf, null, numWorkerThreads); + + this.server.start(); + LOG.info("Router ClientRMService listening on address: " + + this.server.getListenerAddress()); + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + LOG.info("Stopping Router ClientRMService"); + if (this.server != null) { + this.server.stop(); + } + userPipelineMap.clear(); + super.serviceStop(); + } + + /** + * Returns the comma separated intercepter class names from the configuration. + * + * @param conf + * @return the intercepter class names as an instance of ArrayList + */ + private List getInterceptorClassNames(Configuration conf) { + String configuredInterceptorClassNames = + conf.get(YarnConfiguration.ROUTER_CLIENTRM_INTERCEPTOR_CLASS_PIPELINE, + YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_INTERCEPTOR_CLASS); + + List interceptorClassNames = new ArrayList(); + Collection tempList = + StringUtils.getStringCollection(configuredInterceptorClassNames); + for (String item : tempList) { + interceptorClassNames.add(item.trim()); + } + + return interceptorClassNames; + } + + @Override + public GetNewApplicationResponse getNewApplication( + GetNewApplicationRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().getNewApplication(request); + } + + @Override + public SubmitApplicationResponse submitApplication( + SubmitApplicationRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().submitApplication(request); + } + + @Override + public KillApplicationResponse forceKillApplication( + KillApplicationRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().forceKillApplication(request); + } + + @Override + public GetClusterMetricsResponse getClusterMetrics( + GetClusterMetricsRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().getClusterMetrics(request); + } + + @Override + public GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest request) + throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().getClusterNodes(request); + } + + @Override + public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request) + throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().getQueueInfo(request); + } + + @Override + public GetQueueUserAclsInfoResponse getQueueUserAcls( + GetQueueUserAclsInfoRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().getQueueUserAcls(request); + } + + @Override + public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues( + MoveApplicationAcrossQueuesRequest request) + throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().moveApplicationAcrossQueues(request); + } + + @Override + public GetNewReservationResponse getNewReservation( + GetNewReservationRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().getNewReservation(request); + } + + @Override + public ReservationSubmissionResponse submitReservation( + ReservationSubmissionRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().submitReservation(request); + } + + @Override + public ReservationListResponse listReservations( + ReservationListRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().listReservations(request); + } + + @Override + public ReservationUpdateResponse updateReservation( + ReservationUpdateRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().updateReservation(request); + } + + @Override + public ReservationDeleteResponse deleteReservation( + ReservationDeleteRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().deleteReservation(request); + } + + @Override + public GetNodesToLabelsResponse getNodeToLabels( + GetNodesToLabelsRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().getNodeToLabels(request); + } + + @Override + public GetLabelsToNodesResponse getLabelsToNodes( + GetLabelsToNodesRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().getLabelsToNodes(request); + } + + @Override + public GetClusterNodeLabelsResponse getClusterNodeLabels( + GetClusterNodeLabelsRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().getClusterNodeLabels(request); + } + + @Override + public GetApplicationReportResponse getApplicationReport( + GetApplicationReportRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().getApplicationReport(request); + } + + @Override + public GetApplicationsResponse getApplications(GetApplicationsRequest request) + throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().getApplications(request); + } + + @Override + public GetApplicationAttemptReportResponse getApplicationAttemptReport( + GetApplicationAttemptReportRequest request) + throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().getApplicationAttemptReport(request); + } + + @Override + public GetApplicationAttemptsResponse getApplicationAttempts( + GetApplicationAttemptsRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().getApplicationAttempts(request); + } + + @Override + public GetContainerReportResponse getContainerReport( + GetContainerReportRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().getContainerReport(request); + } + + @Override + public GetContainersResponse getContainers(GetContainersRequest request) + throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().getContainers(request); + } + + @Override + public GetDelegationTokenResponse getDelegationToken( + GetDelegationTokenRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().getDelegationToken(request); + } + + @Override + public RenewDelegationTokenResponse renewDelegationToken( + RenewDelegationTokenRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().renewDelegationToken(request); + } + + @Override + public CancelDelegationTokenResponse cancelDelegationToken( + CancelDelegationTokenRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().cancelDelegationToken(request); + } + + @Override + public FailApplicationAttemptResponse failApplicationAttempt( + FailApplicationAttemptRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().failApplicationAttempt(request); + } + + @Override + public UpdateApplicationPriorityResponse updateApplicationPriority( + UpdateApplicationPriorityRequest request) + throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().updateApplicationPriority(request); + } + + @Override + public SignalContainerResponse signalToContainer( + SignalContainerRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().signalToContainer(request); + } + + @Override + public UpdateApplicationTimeoutsResponse updateApplicationTimeouts( + UpdateApplicationTimeoutsRequest request) + throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().updateApplicationTimeouts(request); + } + + private RequestInterceptorChainWrapper getInterceptorChain() + throws IOException { + String user = UserGroupInformation.getCurrentUser().getUserName(); + if (!userPipelineMap.containsKey(user)) { + initializePipeline(user); + } + return userPipelineMap.get(user); + } + + /** + * Gets the Request intercepter chains for all the users. + * + * @return the request intercepter chains. + */ + @VisibleForTesting + protected Map getPipelines() { + return this.userPipelineMap; + } + + /** + * This method creates and returns reference of the first intercepter in the + * chain of request intercepter instances. + * + * @return the reference of the first intercepter in the chain + */ + @VisibleForTesting + protected ClientRequestInterceptor createRequestInterceptorChain() { + Configuration conf = getConfig(); + + List interceptorClassNames = getInterceptorClassNames(conf); + + ClientRequestInterceptor pipeline = null; + ClientRequestInterceptor current = null; + for (String interceptorClassName : interceptorClassNames) { + try { + Class interceptorClass = conf.getClassByName(interceptorClassName); + if (ClientRequestInterceptor.class.isAssignableFrom(interceptorClass)) { + ClientRequestInterceptor interceptorInstance = + (ClientRequestInterceptor) ReflectionUtils + .newInstance(interceptorClass, conf); + if (pipeline == null) { + pipeline = interceptorInstance; + current = interceptorInstance; + continue; + } else { + current.setNextInterceptor(interceptorInstance); + current = interceptorInstance; + } + } else { + throw new YarnRuntimeException( + "Class: " + interceptorClassName + " not instance of " + + ClientRequestInterceptor.class.getCanonicalName()); + } + } catch (ClassNotFoundException e) { + throw new YarnRuntimeException( + "Could not instantiate ApplicationClientRequestInterceptor: " + + interceptorClassName, + e); + } + } + + if (pipeline == null) { + throw new YarnRuntimeException( + "RequestInterceptor pipeline is not configured in the system"); + } + return pipeline; + } + + /** + * Initializes the request intercepter pipeline for the specified application. + * + * @param user + */ + private void initializePipeline(String user) { + RequestInterceptorChainWrapper chainWrapper = null; + synchronized (this.userPipelineMap) { + if (this.userPipelineMap.containsKey(user)) { + LOG.info("Request to start an already existing user: {}" + + " was received, so ignoring.", user); + return; + } + + chainWrapper = new RequestInterceptorChainWrapper(); + this.userPipelineMap.put(user, chainWrapper); + } + + // We register the pipeline instance in the map first and then initialize it + // later because chain initialization can be expensive and we would like to + // release the lock as soon as possible to prevent other applications from + // blocking when one application's chain is initializing + LOG.info("Initializing request processing pipeline for application " + + "for the user: {}", user); + + try { + ClientRequestInterceptor interceptorChain = + this.createRequestInterceptorChain(); + interceptorChain.init(user); + chainWrapper.init(interceptorChain); + } catch (Exception e) { + synchronized (this.userPipelineMap) { + this.userPipelineMap.remove(user); + } + throw e; + } + } + + /** + * Private structure for encapsulating RequestInterceptor and user instances. + * + */ + @Private + public static class RequestInterceptorChainWrapper { + private ClientRequestInterceptor rootInterceptor; + + /** + * Initializes the wrapper with the specified parameters. + * + * @param interceptor the first interceptor in the pipeline + */ + public synchronized void init(ClientRequestInterceptor interceptor) { + this.rootInterceptor = interceptor; + } + + /** + * Gets the root request intercepter. + * + * @return the root request intercepter + */ + public synchronized ClientRequestInterceptor getRootInterceptor() { + return rootInterceptor; + } + + /** + * Shutdown the chain of interceptors when the object is destroyed. + */ + @Override + protected void finalize() { + rootInterceptor.shutdown(); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/package-info.java similarity index 87% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouter.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/package-info.java index a31d6b944d..7d1dadd373 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouter.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/package-info.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -16,11 +16,5 @@ * limitations under the License. */ -package org.apache.hadoop.yarn.server.router; - -/** - * Test class for YARN Router. - */ -public class TestRouter { - -} +/** Router ClientRM Proxy Service package. **/ +package org.apache.hadoop.yarn.server.router.clientrm; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/BaseRouterClientRMTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/BaseRouterClientRMTest.java new file mode 100644 index 0000000000..a283a624aa --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/BaseRouterClientRMTest.java @@ -0,0 +1,574 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.router.clientrm; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.util.Collections; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ReservationDefinition; +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.api.records.ReservationRequest; +import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter; +import org.apache.hadoop.yarn.api.records.ReservationRequests; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.UTCClock; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; + +/** + * Base class for all the RouterClientRMService test cases. It provides utility + * methods that can be used by the concrete test case classes. + * + */ +public abstract class BaseRouterClientRMTest { + + /** + * The RouterClientRMService instance that will be used by all the test cases. + */ + private MockRouterClientRMService clientrmService; + /** + * Thread pool used for asynchronous operations. + */ + private static ExecutorService threadpool = Executors.newCachedThreadPool(); + private Configuration conf; + private AsyncDispatcher dispatcher; + + public final static int TEST_MAX_CACHE_SIZE = 10; + + protected MockRouterClientRMService getRouterClientRMService() { + Assert.assertNotNull(this.clientrmService); + return this.clientrmService; + } + + @Before + public void setUp() { + this.conf = new YarnConfiguration(); + String mockPassThroughInterceptorClass = + PassThroughClientRequestInterceptor.class.getName(); + + // Create a request intercepter pipeline for testing. The last one in the + // chain will call the mock resource manager. The others in the chain will + // simply forward it to the next one in the chain + this.conf.set(YarnConfiguration.ROUTER_CLIENTRM_INTERCEPTOR_CLASS_PIPELINE, + mockPassThroughInterceptorClass + "," + mockPassThroughInterceptorClass + + "," + mockPassThroughInterceptorClass + "," + + MockClientRequestInterceptor.class.getName()); + + this.conf.setInt(YarnConfiguration.ROUTER_CLIENTRM_PIPELINE_CACHE_MAX_SIZE, + TEST_MAX_CACHE_SIZE); + + this.dispatcher = new AsyncDispatcher(); + this.dispatcher.init(conf); + this.dispatcher.start(); + this.clientrmService = createAndStartRouterClientRMService(); + } + + @After + public void tearDown() { + if (clientrmService != null) { + clientrmService.stop(); + clientrmService = null; + } + if (this.dispatcher != null) { + this.dispatcher.stop(); + } + } + + protected ExecutorService getThreadPool() { + return threadpool; + } + + protected MockRouterClientRMService createAndStartRouterClientRMService() { + MockRouterClientRMService svc = new MockRouterClientRMService(); + svc.init(conf); + svc.start(); + return svc; + } + + protected static class MockRouterClientRMService + extends RouterClientRMService { + public MockRouterClientRMService() { + super(); + } + } + + protected GetNewApplicationResponse getNewApplication(String user) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction() { + @Override + public GetNewApplicationResponse run() throws Exception { + GetNewApplicationRequest req = + GetNewApplicationRequest.newInstance(); + GetNewApplicationResponse response = + getRouterClientRMService().getNewApplication(req); + return response; + } + }); + } + + protected SubmitApplicationResponse submitApplication( + final ApplicationId appId, String user) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction() { + @Override + public SubmitApplicationResponse run() throws Exception { + ApplicationSubmissionContext context = + ApplicationSubmissionContext.newInstance(appId, "", "", null, + null, false, false, -1, null, null); + SubmitApplicationRequest req = + SubmitApplicationRequest.newInstance(context); + SubmitApplicationResponse response = + getRouterClientRMService().submitApplication(req); + return response; + } + }); + } + + protected KillApplicationResponse forceKillApplication( + final ApplicationId appId, String user) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction() { + @Override + public KillApplicationResponse run() throws Exception { + KillApplicationRequest req = + KillApplicationRequest.newInstance(appId); + KillApplicationResponse response = + getRouterClientRMService().forceKillApplication(req); + return response; + } + }); + } + + protected GetClusterMetricsResponse getClusterMetrics(String user) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction() { + @Override + public GetClusterMetricsResponse run() throws Exception { + GetClusterMetricsRequest req = + GetClusterMetricsRequest.newInstance(); + GetClusterMetricsResponse response = + getRouterClientRMService().getClusterMetrics(req); + return response; + } + }); + } + + protected GetClusterNodesResponse getClusterNodes(String user) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction() { + @Override + public GetClusterNodesResponse run() throws Exception { + GetClusterNodesRequest req = GetClusterNodesRequest.newInstance(); + GetClusterNodesResponse response = + getRouterClientRMService().getClusterNodes(req); + return response; + } + }); + } + + protected GetQueueInfoResponse getQueueInfo(String user) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction() { + @Override + public GetQueueInfoResponse run() throws Exception { + GetQueueInfoRequest req = + GetQueueInfoRequest.newInstance("default", false, false, false); + GetQueueInfoResponse response = + getRouterClientRMService().getQueueInfo(req); + return response; + } + }); + } + + protected GetQueueUserAclsInfoResponse getQueueUserAcls(String user) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction() { + @Override + public GetQueueUserAclsInfoResponse run() throws Exception { + GetQueueUserAclsInfoRequest req = + GetQueueUserAclsInfoRequest.newInstance(); + GetQueueUserAclsInfoResponse response = + getRouterClientRMService().getQueueUserAcls(req); + return response; + } + }); + } + + protected MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues( + String user, final ApplicationId appId) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user).doAs( + new PrivilegedExceptionAction() { + @Override + public MoveApplicationAcrossQueuesResponse run() throws Exception { + + MoveApplicationAcrossQueuesRequest req = + MoveApplicationAcrossQueuesRequest.newInstance(appId, + "newQueue"); + MoveApplicationAcrossQueuesResponse response = + getRouterClientRMService().moveApplicationAcrossQueues(req); + return response; + } + }); + } + + public GetNewReservationResponse getNewReservation(String user) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction() { + @Override + public GetNewReservationResponse run() throws Exception { + GetNewReservationResponse response = getRouterClientRMService() + .getNewReservation(GetNewReservationRequest.newInstance()); + return response; + } + }); + } + + protected ReservationSubmissionResponse submitReservation(String user, + final ReservationId reservationId) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction() { + @Override + public ReservationSubmissionResponse run() throws Exception { + Clock clock = new UTCClock(); + long arrival = clock.getTime(); + long duration = 60000; + long deadline = (long) (arrival + 1.05 * duration); + + ReservationSubmissionRequest req = createSimpleReservationRequest(1, + arrival, deadline, duration, reservationId); + ReservationSubmissionResponse response = + getRouterClientRMService().submitReservation(req); + return response; + } + }); + } + + protected ReservationUpdateResponse updateReservation(String user, + final ReservationId reservationId) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction() { + @Override + public ReservationUpdateResponse run() throws Exception { + Clock clock = new UTCClock(); + long arrival = clock.getTime(); + long duration = 60000; + long deadline = (long) (arrival + 1.05 * duration); + ReservationDefinition rDef = + createSimpleReservationRequest(1, arrival, deadline, duration, + reservationId).getReservationDefinition(); + + ReservationUpdateRequest req = + ReservationUpdateRequest.newInstance(rDef, reservationId); + ReservationUpdateResponse response = + getRouterClientRMService().updateReservation(req); + return response; + } + }); + } + + protected ReservationDeleteResponse deleteReservation(String user, + final ReservationId reservationId) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction() { + @Override + public ReservationDeleteResponse run() throws Exception { + ReservationDeleteRequest req = + ReservationDeleteRequest.newInstance(reservationId); + ReservationDeleteResponse response = + getRouterClientRMService().deleteReservation(req); + return response; + } + }); + } + + protected GetNodesToLabelsResponse getNodeToLabels(String user) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction() { + @Override + public GetNodesToLabelsResponse run() throws Exception { + GetNodesToLabelsRequest req = GetNodesToLabelsRequest.newInstance(); + GetNodesToLabelsResponse response = + getRouterClientRMService().getNodeToLabels(req); + return response; + } + }); + } + + protected GetLabelsToNodesResponse getLabelsToNodes(String user) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction() { + @Override + public GetLabelsToNodesResponse run() throws Exception { + GetLabelsToNodesRequest req = GetLabelsToNodesRequest.newInstance(); + GetLabelsToNodesResponse response = + getRouterClientRMService().getLabelsToNodes(req); + return response; + } + }); + } + + protected GetClusterNodeLabelsResponse getClusterNodeLabels(String user) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction() { + @Override + public GetClusterNodeLabelsResponse run() throws Exception { + GetClusterNodeLabelsRequest req = + GetClusterNodeLabelsRequest.newInstance(); + GetClusterNodeLabelsResponse response = + getRouterClientRMService().getClusterNodeLabels(req); + return response; + } + }); + } + + protected GetApplicationReportResponse getApplicationReport(String user, + final ApplicationId appId) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction() { + @Override + public GetApplicationReportResponse run() throws Exception { + GetApplicationReportRequest req = + GetApplicationReportRequest.newInstance(appId); + GetApplicationReportResponse response = + getRouterClientRMService().getApplicationReport(req); + return response; + } + }); + } + + protected GetApplicationsResponse getApplications(String user) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction() { + @Override + public GetApplicationsResponse run() throws Exception { + GetApplicationsRequest req = GetApplicationsRequest.newInstance(); + GetApplicationsResponse response = + getRouterClientRMService().getApplications(req); + return response; + } + }); + } + + protected GetApplicationAttemptReportResponse getApplicationAttemptReport( + String user, final ApplicationAttemptId appAttemptId) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user).doAs( + new PrivilegedExceptionAction() { + @Override + public GetApplicationAttemptReportResponse run() throws Exception { + GetApplicationAttemptReportRequest req = + GetApplicationAttemptReportRequest.newInstance(appAttemptId); + GetApplicationAttemptReportResponse response = + getRouterClientRMService().getApplicationAttemptReport(req); + return response; + } + }); + } + + protected GetApplicationAttemptsResponse getApplicationAttempts(String user, + final ApplicationId applicationId) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction() { + @Override + public GetApplicationAttemptsResponse run() throws Exception { + GetApplicationAttemptsRequest req = + GetApplicationAttemptsRequest.newInstance(applicationId); + GetApplicationAttemptsResponse response = + getRouterClientRMService().getApplicationAttempts(req); + return response; + } + }); + } + + protected GetContainerReportResponse getContainerReport(String user, + final ContainerId containerId) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction() { + @Override + public GetContainerReportResponse run() throws Exception { + GetContainerReportRequest req = + GetContainerReportRequest.newInstance(containerId); + GetContainerReportResponse response = + getRouterClientRMService().getContainerReport(req); + return response; + } + }); + } + + protected GetContainersResponse getContainers(String user, + final ApplicationAttemptId appAttemptId) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction() { + @Override + public GetContainersResponse run() throws Exception { + GetContainersRequest req = + GetContainersRequest.newInstance(appAttemptId); + GetContainersResponse response = + getRouterClientRMService().getContainers(req); + return response; + } + }); + } + + protected GetDelegationTokenResponse getDelegationToken(final String user) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction() { + @Override + public GetDelegationTokenResponse run() throws Exception { + GetDelegationTokenRequest req = + GetDelegationTokenRequest.newInstance(user); + GetDelegationTokenResponse response = + getRouterClientRMService().getDelegationToken(req); + return response; + } + }); + } + + protected RenewDelegationTokenResponse renewDelegationToken(String user, + final Token token) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction() { + @Override + public RenewDelegationTokenResponse run() throws Exception { + RenewDelegationTokenRequest req = + RenewDelegationTokenRequest.newInstance(token); + RenewDelegationTokenResponse response = + getRouterClientRMService().renewDelegationToken(req); + return response; + } + }); + } + + protected CancelDelegationTokenResponse cancelDelegationToken(String user, + final Token token) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction() { + @Override + public CancelDelegationTokenResponse run() throws Exception { + CancelDelegationTokenRequest req = + CancelDelegationTokenRequest.newInstance(token); + CancelDelegationTokenResponse response = + getRouterClientRMService().cancelDelegationToken(req); + return response; + } + }); + } + + private ReservationSubmissionRequest createSimpleReservationRequest( + int numContainers, long arrival, long deadline, long duration, + ReservationId reservationId) { + // create a request with a single atomic ask + ReservationRequest r = ReservationRequest + .newInstance(Resource.newInstance(1024, 1), numContainers, 1, duration); + ReservationRequests reqs = ReservationRequests.newInstance( + Collections.singletonList(r), ReservationRequestInterpreter.R_ALL); + ReservationDefinition rDef = ReservationDefinition.newInstance(arrival, + deadline, reqs, "testRouterClientRMService#reservation"); + ReservationSubmissionRequest request = ReservationSubmissionRequest + .newInstance(rDef, "dedicated", reservationId); + return request; + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/MockClientRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/MockClientRequestInterceptor.java new file mode 100644 index 0000000000..b38703fb54 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/MockClientRequestInterceptor.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.router.clientrm; + +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.MockResourceManagerFacade; + +/** + * This class mocks the ClientRequestInterceptor. + */ +public class MockClientRequestInterceptor + extends DefaultClientRequestInterceptor { + + public void init(String user) { + MockResourceManagerFacade mockRM = new MockResourceManagerFacade( + new YarnConfiguration(super.getConf()), 0); + super.setRMClient(mockRM); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/PassThroughClientRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/PassThroughClientRequestInterceptor.java new file mode 100644 index 0000000000..c403bd5006 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/PassThroughClientRequestInterceptor.java @@ -0,0 +1,267 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.router.clientrm; + +import java.io.IOException; + +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse; +import org.apache.hadoop.yarn.exceptions.YarnException; + +/** + * Mock intercepter that does not do anything other than forwarding it to the + * next intercepter in the chain. + */ +public class PassThroughClientRequestInterceptor + extends AbstractClientRequestInterceptor { + + @Override + public GetNewApplicationResponse getNewApplication( + GetNewApplicationRequest request) throws YarnException, IOException { + return getNextInterceptor().getNewApplication(request); + } + + @Override + public SubmitApplicationResponse submitApplication( + SubmitApplicationRequest request) throws YarnException, IOException { + return getNextInterceptor().submitApplication(request); + } + + @Override + public KillApplicationResponse forceKillApplication( + KillApplicationRequest request) throws YarnException, IOException { + return getNextInterceptor().forceKillApplication(request); + } + + @Override + public GetClusterMetricsResponse getClusterMetrics( + GetClusterMetricsRequest request) throws YarnException, IOException { + return getNextInterceptor().getClusterMetrics(request); + } + + @Override + public GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest request) + throws YarnException, IOException { + return getNextInterceptor().getClusterNodes(request); + } + + @Override + public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request) + throws YarnException, IOException { + return getNextInterceptor().getQueueInfo(request); + } + + @Override + public GetQueueUserAclsInfoResponse getQueueUserAcls( + GetQueueUserAclsInfoRequest request) throws YarnException, IOException { + return getNextInterceptor().getQueueUserAcls(request); + } + + @Override + public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues( + MoveApplicationAcrossQueuesRequest request) + throws YarnException, IOException { + return getNextInterceptor().moveApplicationAcrossQueues(request); + } + + @Override + public GetNewReservationResponse getNewReservation( + GetNewReservationRequest request) throws YarnException, IOException { + return getNextInterceptor().getNewReservation(request); + } + + @Override + public ReservationSubmissionResponse submitReservation( + ReservationSubmissionRequest request) throws YarnException, IOException { + return getNextInterceptor().submitReservation(request); + } + + @Override + public ReservationListResponse listReservations( + ReservationListRequest request) throws YarnException, IOException { + return getNextInterceptor().listReservations(request); + } + + @Override + public ReservationUpdateResponse updateReservation( + ReservationUpdateRequest request) throws YarnException, IOException { + return getNextInterceptor().updateReservation(request); + } + + @Override + public ReservationDeleteResponse deleteReservation( + ReservationDeleteRequest request) throws YarnException, IOException { + return getNextInterceptor().deleteReservation(request); + } + + @Override + public GetNodesToLabelsResponse getNodeToLabels( + GetNodesToLabelsRequest request) throws YarnException, IOException { + return getNextInterceptor().getNodeToLabels(request); + } + + @Override + public GetLabelsToNodesResponse getLabelsToNodes( + GetLabelsToNodesRequest request) throws YarnException, IOException { + return getNextInterceptor().getLabelsToNodes(request); + } + + @Override + public GetClusterNodeLabelsResponse getClusterNodeLabels( + GetClusterNodeLabelsRequest request) throws YarnException, IOException { + return getNextInterceptor().getClusterNodeLabels(request); + } + + @Override + public GetApplicationReportResponse getApplicationReport( + GetApplicationReportRequest request) throws YarnException, IOException { + return getNextInterceptor().getApplicationReport(request); + } + + @Override + public GetApplicationsResponse getApplications(GetApplicationsRequest request) + throws YarnException, IOException { + return getNextInterceptor().getApplications(request); + } + + @Override + public GetApplicationAttemptReportResponse getApplicationAttemptReport( + GetApplicationAttemptReportRequest request) + throws YarnException, IOException { + return getNextInterceptor().getApplicationAttemptReport(request); + } + + @Override + public GetApplicationAttemptsResponse getApplicationAttempts( + GetApplicationAttemptsRequest request) throws YarnException, IOException { + return getNextInterceptor().getApplicationAttempts(request); + } + + @Override + public GetContainerReportResponse getContainerReport( + GetContainerReportRequest request) throws YarnException, IOException { + return getNextInterceptor().getContainerReport(request); + } + + @Override + public GetContainersResponse getContainers(GetContainersRequest request) + throws YarnException, IOException { + return getNextInterceptor().getContainers(request); + } + + @Override + public GetDelegationTokenResponse getDelegationToken( + GetDelegationTokenRequest request) throws YarnException, IOException { + return getNextInterceptor().getDelegationToken(request); + } + + @Override + public RenewDelegationTokenResponse renewDelegationToken( + RenewDelegationTokenRequest request) throws YarnException, IOException { + return getNextInterceptor().renewDelegationToken(request); + } + + @Override + public CancelDelegationTokenResponse cancelDelegationToken( + CancelDelegationTokenRequest request) throws YarnException, IOException { + return getNextInterceptor().cancelDelegationToken(request); + } + + @Override + public FailApplicationAttemptResponse failApplicationAttempt( + FailApplicationAttemptRequest request) throws YarnException, IOException { + return getNextInterceptor().failApplicationAttempt(request); + } + + @Override + public UpdateApplicationPriorityResponse updateApplicationPriority( + UpdateApplicationPriorityRequest request) + throws YarnException, IOException { + return getNextInterceptor().updateApplicationPriority(request); + } + + @Override + public SignalContainerResponse signalToContainer( + SignalContainerRequest request) throws YarnException, IOException { + return getNextInterceptor().signalToContainer(request); + } + + @Override + public UpdateApplicationTimeoutsResponse updateApplicationTimeouts( + UpdateApplicationTimeoutsRequest request) + throws YarnException, IOException { + return getNextInterceptor().updateApplicationTimeouts(request); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestRouterClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestRouterClientRMService.java new file mode 100644 index 0000000000..a9c37293f6 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestRouterClientRMService.java @@ -0,0 +1,210 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.router.clientrm; + +import java.io.IOException; +import java.util.Map; + +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.router.clientrm.RouterClientRMService.RequestInterceptorChainWrapper; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test class to validate the ClientRM Service inside the Router. + */ +public class TestRouterClientRMService extends BaseRouterClientRMTest { + + private static final Logger LOG = + LoggerFactory.getLogger(TestRouterClientRMService.class); + + /** + * Tests if the pipeline is created properly. + */ + @Test + public void testRequestInterceptorChainCreation() throws Exception { + ClientRequestInterceptor root = + super.getRouterClientRMService().createRequestInterceptorChain(); + int index = 0; + while (root != null) { + // The current pipeline is: + // PassThroughClientRequestInterceptor - index = 0 + // PassThroughClientRequestInterceptor - index = 1 + // PassThroughClientRequestInterceptor - index = 2 + // MockClientRequestInterceptor - index = 3 + switch (index) { + case 0: // Fall to the next case + case 1: // Fall to the next case + case 2: + // If index is equal to 0,1 or 2 we fall in this check + Assert.assertEquals(PassThroughClientRequestInterceptor.class.getName(), + root.getClass().getName()); + break; + case 3: + Assert.assertEquals(MockClientRequestInterceptor.class.getName(), + root.getClass().getName()); + break; + default: + Assert.fail(); + } + root = root.getNextInterceptor(); + index++; + } + Assert.assertEquals("The number of interceptors in chain does not match", 4, + index); + } + + /** + * Test if the RouterClientRM forwards all the requests to the MockRM and get + * back the responses. + */ + @Test + public void testRouterClientRMServiceE2E() throws Exception { + + String user = "test1"; + + LOG.info("testRouterClientRMServiceE2E - Get New Application"); + + GetNewApplicationResponse responseGetNewApp = getNewApplication(user); + Assert.assertNotNull(responseGetNewApp); + + LOG.info("testRouterClientRMServiceE2E - Submit Application"); + + SubmitApplicationResponse responseSubmitApp = + submitApplication(responseGetNewApp.getApplicationId(), user); + Assert.assertNotNull(responseSubmitApp); + + LOG.info("testRouterClientRMServiceE2E - Kill Application"); + + KillApplicationResponse responseKillApp = + forceKillApplication(responseGetNewApp.getApplicationId(), user); + Assert.assertNotNull(responseKillApp); + + LOG.info("testRouterClientRMServiceE2E - Get Cluster Metrics"); + + GetClusterMetricsResponse responseGetClusterMetrics = + getClusterMetrics(user); + Assert.assertNotNull(responseGetClusterMetrics); + + LOG.info("testRouterClientRMServiceE2E - Get Cluster Nodes"); + + GetClusterNodesResponse responseGetClusterNodes = getClusterNodes(user); + Assert.assertNotNull(responseGetClusterNodes); + + LOG.info("testRouterClientRMServiceE2E - Get Queue Info"); + + GetQueueInfoResponse responseGetQueueInfo = getQueueInfo(user); + Assert.assertNotNull(responseGetQueueInfo); + + LOG.info("testRouterClientRMServiceE2E - Get Queue User"); + + GetQueueUserAclsInfoResponse responseGetQueueUser = getQueueUserAcls(user); + Assert.assertNotNull(responseGetQueueUser); + + LOG.info("testRouterClientRMServiceE2E - Get Cluster Node"); + + GetClusterNodeLabelsResponse responseGetClusterNode = + getClusterNodeLabels(user); + Assert.assertNotNull(responseGetClusterNode); + + LOG.info("testRouterClientRMServiceE2E - Move Application Across Queues"); + + MoveApplicationAcrossQueuesResponse responseMoveApp = + moveApplicationAcrossQueues(user, responseGetNewApp.getApplicationId()); + Assert.assertNotNull(responseMoveApp); + + LOG.info("testRouterClientRMServiceE2E - Get New Reservation"); + + GetNewReservationResponse getNewReservationResponse = + getNewReservation(user); + + LOG.info("testRouterClientRMServiceE2E - Submit Reservation"); + + ReservationSubmissionResponse responseSubmitReser = + submitReservation(user, getNewReservationResponse.getReservationId()); + Assert.assertNotNull(responseSubmitReser); + + LOG.info("testRouterClientRMServiceE2E - Update Reservation"); + + ReservationUpdateResponse responseUpdateReser = + updateReservation(user, getNewReservationResponse.getReservationId()); + Assert.assertNotNull(responseUpdateReser); + + LOG.info("testRouterClientRMServiceE2E - Delete Reservation"); + + ReservationDeleteResponse responseDeleteReser = + deleteReservation(user, getNewReservationResponse.getReservationId()); + Assert.assertNotNull(responseDeleteReser); + } + + /** + * Test if the different chains for users are generated, and LRU cache is + * working as expected. + */ + @Test + public void testUsersChainMapWithLRUCache() + throws YarnException, IOException, InterruptedException { + + Map pipelines; + RequestInterceptorChainWrapper chain; + + getNewApplication("test1"); + getNewApplication("test2"); + getNewApplication("test3"); + getNewApplication("test4"); + getNewApplication("test5"); + getNewApplication("test6"); + getNewApplication("test7"); + getNewApplication("test8"); + + pipelines = super.getRouterClientRMService().getPipelines(); + Assert.assertEquals(8, pipelines.size()); + + getNewApplication("test9"); + getNewApplication("test10"); + getNewApplication("test1"); + getNewApplication("test11"); + + // The cache max size is defined in + // BaseRouterClientRMTest.TEST_MAX_CACHE_SIZE + Assert.assertEquals(10, pipelines.size()); + + chain = pipelines.get("test1"); + Assert.assertNotNull("test1 should not be evicted", chain); + + chain = pipelines.get("test2"); + Assert.assertNull("test2 should have been evicted", chain); + } + +}