From 2c03524fa4be754aa95889d4ac0f5d57dca8cda8 Mon Sep 17 00:00:00 2001 From: Brahma Reddy Battula Date: Fri, 26 Jun 2020 20:43:27 +0530 Subject: [PATCH] YARN-6526. Refactoring SQLFederationStateStore by avoiding to recreate a connection at every call. COntributed by Bilwa S T. --- .../store/impl/SQLFederationStateStore.java | 124 ++++++++---------- .../FederationStateStoreClientMetrics.java | 18 +++ .../utils/FederationStateStoreUtils.java | 14 ++ .../impl/FederationStateStoreBaseTest.java | 15 ++- .../impl/HSQLDBFederationStateStore.java | 3 +- .../impl/TestSQLFederationStateStore.java | 28 ++++ .../TestZookeeperFederationStateStore.java | 4 +- 7 files changed, 130 insertions(+), 76 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java index 07dc7e4799..8ceef4310f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java @@ -78,6 +78,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; import com.zaxxer.hikari.HikariDataSource; /** @@ -141,6 +142,8 @@ public class SQLFederationStateStore implements FederationStateStore { private int maximumPoolSize; private HikariDataSource dataSource = null; private final Clock clock = new MonotonicClock(); + @VisibleForTesting + Connection conn = null; @Override public void init(Configuration conf) throws YarnException { @@ -173,6 +176,13 @@ public void init(Configuration conf) throws YarnException { dataSource.setMaximumPoolSize(maximumPoolSize); LOG.info("Initialized connection pool to the Federation StateStore " + "database at address: " + url); + try { + conn = getConnection(); + LOG.debug("Connection created"); + } catch (SQLException e) { + FederationStateStoreUtils.logAndThrowRetriableException(LOG, + "Not able to get Connection", e); + } } @Override @@ -185,15 +195,13 @@ public SubClusterRegisterResponse registerSubCluster( .validate(registerSubClusterRequest); CallableStatement cstmt = null; - Connection conn = null; SubClusterInfo subClusterInfo = registerSubClusterRequest.getSubClusterInfo(); SubClusterId subClusterId = subClusterInfo.getSubClusterId(); try { - conn = getConnection(); - cstmt = conn.prepareCall(CALL_SP_REGISTER_SUBCLUSTER); + cstmt = getCallableStatement(CALL_SP_REGISTER_SUBCLUSTER); // Set the parameters for the stored procedure cstmt.setString(1, subClusterId.getId()); @@ -238,9 +246,10 @@ public SubClusterRegisterResponse registerSubCluster( + " into the StateStore", e); } finally { - // Return to the pool the CallableStatement and the Connection - FederationStateStoreUtils.returnToPool(LOG, cstmt, conn); + // Return to the pool the CallableStatement + FederationStateStoreUtils.returnToPool(LOG, cstmt); } + return SubClusterRegisterResponse.newInstance(); } @@ -254,14 +263,12 @@ public SubClusterDeregisterResponse deregisterSubCluster( .validate(subClusterDeregisterRequest); CallableStatement cstmt = null; - Connection conn = null; SubClusterId subClusterId = subClusterDeregisterRequest.getSubClusterId(); SubClusterState state = subClusterDeregisterRequest.getState(); try { - conn = getConnection(); - cstmt = conn.prepareCall(CALL_SP_DEREGISTER_SUBCLUSTER); + cstmt = getCallableStatement(CALL_SP_DEREGISTER_SUBCLUSTER); // Set the parameters for the stored procedure cstmt.setString(1, subClusterId.getId()); @@ -299,8 +306,8 @@ public SubClusterDeregisterResponse deregisterSubCluster( + state.toString(), e); } finally { - // Return to the pool the CallableStatement and the Connection - FederationStateStoreUtils.returnToPool(LOG, cstmt, conn); + // Return to the pool the CallableStatement + FederationStateStoreUtils.returnToPool(LOG, cstmt); } return SubClusterDeregisterResponse.newInstance(); } @@ -315,14 +322,12 @@ public SubClusterHeartbeatResponse subClusterHeartbeat( .validate(subClusterHeartbeatRequest); CallableStatement cstmt = null; - Connection conn = null; SubClusterId subClusterId = subClusterHeartbeatRequest.getSubClusterId(); SubClusterState state = subClusterHeartbeatRequest.getState(); try { - conn = getConnection(); - cstmt = conn.prepareCall(CALL_SP_SUBCLUSTER_HEARTBEAT); + cstmt = getCallableStatement(CALL_SP_SUBCLUSTER_HEARTBEAT); // Set the parameters for the stored procedure cstmt.setString(1, subClusterId.getId()); @@ -362,8 +367,8 @@ public SubClusterHeartbeatResponse subClusterHeartbeat( + subClusterId, e); } finally { - // Return to the pool the CallableStatement and the Connection - FederationStateStoreUtils.returnToPool(LOG, cstmt, conn); + // Return to the pool the CallableStatement + FederationStateStoreUtils.returnToPool(LOG, cstmt); } return SubClusterHeartbeatResponse.newInstance(); } @@ -376,14 +381,12 @@ public GetSubClusterInfoResponse getSubCluster( FederationMembershipStateStoreInputValidator.validate(subClusterRequest); CallableStatement cstmt = null; - Connection conn = null; SubClusterInfo subClusterInfo = null; SubClusterId subClusterId = subClusterRequest.getSubClusterId(); try { - conn = getConnection(); - cstmt = conn.prepareCall(CALL_SP_GET_SUBCLUSTER); + cstmt = getCallableStatement(CALL_SP_GET_SUBCLUSTER); cstmt.setString(1, subClusterId.getId()); // Set the parameters for the stored procedure @@ -443,8 +446,8 @@ public GetSubClusterInfoResponse getSubCluster( FederationStateStoreUtils.logAndThrowRetriableException(LOG, "Unable to obtain the SubCluster information for " + subClusterId, e); } finally { - // Return to the pool the CallableStatement and the Connection - FederationStateStoreUtils.returnToPool(LOG, cstmt, conn); + // Return to the pool the CallableStatement + FederationStateStoreUtils.returnToPool(LOG, cstmt); } return GetSubClusterInfoResponse.newInstance(subClusterInfo); } @@ -453,13 +456,11 @@ public GetSubClusterInfoResponse getSubCluster( public GetSubClustersInfoResponse getSubClusters( GetSubClustersInfoRequest subClustersRequest) throws YarnException { CallableStatement cstmt = null; - Connection conn = null; ResultSet rs = null; List subClusters = new ArrayList(); try { - conn = getConnection(); - cstmt = conn.prepareCall(CALL_SP_GET_SUBCLUSTERS); + cstmt = getCallableStatement(CALL_SP_GET_SUBCLUSTERS); // Execute the query long startTime = clock.getTime(); @@ -510,8 +511,8 @@ public GetSubClustersInfoResponse getSubClusters( FederationStateStoreUtils.logAndThrowRetriableException(LOG, "Unable to obtain the information for all the SubClusters ", e); } finally { - // Return to the pool the CallableStatement and the Connection - FederationStateStoreUtils.returnToPool(LOG, cstmt, conn, rs); + // Return to the pool the CallableStatement + FederationStateStoreUtils.returnToPool(LOG, cstmt, null, rs); } return GetSubClustersInfoResponse.newInstance(subClusters); } @@ -524,7 +525,6 @@ public AddApplicationHomeSubClusterResponse addApplicationHomeSubCluster( FederationApplicationHomeSubClusterStoreInputValidator.validate(request); CallableStatement cstmt = null; - Connection conn = null; String subClusterHome = null; ApplicationId appId = @@ -533,8 +533,7 @@ public AddApplicationHomeSubClusterResponse addApplicationHomeSubCluster( request.getApplicationHomeSubCluster().getHomeSubCluster(); try { - conn = getConnection(); - cstmt = conn.prepareCall(CALL_SP_ADD_APPLICATION_HOME_SUBCLUSTER); + cstmt = getCallableStatement(CALL_SP_ADD_APPLICATION_HOME_SUBCLUSTER); // Set the parameters for the stored procedure cstmt.setString(1, appId.toString()); @@ -596,8 +595,8 @@ public AddApplicationHomeSubClusterResponse addApplicationHomeSubCluster( + request.getApplicationHomeSubCluster().getApplicationId(), e); } finally { - // Return to the pool the CallableStatement and the Connection - FederationStateStoreUtils.returnToPool(LOG, cstmt, conn); + // Return to the pool the CallableStatement + FederationStateStoreUtils.returnToPool(LOG, cstmt); } return AddApplicationHomeSubClusterResponse .newInstance(SubClusterId.newInstance(subClusterHome)); @@ -611,7 +610,6 @@ public UpdateApplicationHomeSubClusterResponse updateApplicationHomeSubCluster( FederationApplicationHomeSubClusterStoreInputValidator.validate(request); CallableStatement cstmt = null; - Connection conn = null; ApplicationId appId = request.getApplicationHomeSubCluster().getApplicationId(); @@ -619,8 +617,7 @@ public UpdateApplicationHomeSubClusterResponse updateApplicationHomeSubCluster( request.getApplicationHomeSubCluster().getHomeSubCluster(); try { - conn = getConnection(); - cstmt = conn.prepareCall(CALL_SP_UPDATE_APPLICATION_HOME_SUBCLUSTER); + cstmt = getCallableStatement(CALL_SP_UPDATE_APPLICATION_HOME_SUBCLUSTER); // Set the parameters for the stored procedure cstmt.setString(1, appId.toString()); @@ -660,8 +657,8 @@ public UpdateApplicationHomeSubClusterResponse updateApplicationHomeSubCluster( + request.getApplicationHomeSubCluster().getApplicationId(), e); } finally { - // Return to the pool the CallableStatement and the Connection - FederationStateStoreUtils.returnToPool(LOG, cstmt, conn); + // Return to the pool the CallableStatement + FederationStateStoreUtils.returnToPool(LOG, cstmt); } return UpdateApplicationHomeSubClusterResponse.newInstance(); } @@ -673,13 +670,11 @@ public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster( FederationApplicationHomeSubClusterStoreInputValidator.validate(request); CallableStatement cstmt = null; - Connection conn = null; SubClusterId homeRM = null; try { - conn = getConnection(); - cstmt = conn.prepareCall(CALL_SP_GET_APPLICATION_HOME_SUBCLUSTER); + cstmt = getCallableStatement(CALL_SP_GET_APPLICATION_HOME_SUBCLUSTER); // Set the parameters for the stored procedure cstmt.setString(1, request.getApplicationId().toString()); @@ -711,9 +706,8 @@ public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster( + "for the specified application " + request.getApplicationId(), e); } finally { - - // Return to the pool the CallableStatement and the Connection - FederationStateStoreUtils.returnToPool(LOG, cstmt, conn); + // Return to the pool the CallableStatement + FederationStateStoreUtils.returnToPool(LOG, cstmt); } return GetApplicationHomeSubClusterResponse .newInstance(ApplicationHomeSubCluster @@ -724,14 +718,12 @@ public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster( public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster( GetApplicationsHomeSubClusterRequest request) throws YarnException { CallableStatement cstmt = null; - Connection conn = null; ResultSet rs = null; List appsHomeSubClusters = new ArrayList(); try { - conn = getConnection(); - cstmt = conn.prepareCall(CALL_SP_GET_APPLICATIONS_HOME_SUBCLUSTER); + cstmt = getCallableStatement(CALL_SP_GET_APPLICATIONS_HOME_SUBCLUSTER); // Execute the query long startTime = clock.getTime(); @@ -757,8 +749,8 @@ public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster( FederationStateStoreUtils.logAndThrowRetriableException(LOG, "Unable to obtain the information for all the applications ", e); } finally { - // Return to the pool the CallableStatement and the Connection - FederationStateStoreUtils.returnToPool(LOG, cstmt, conn, rs); + // Return to the pool the CallableStatement + FederationStateStoreUtils.returnToPool(LOG, cstmt, null, rs); } return GetApplicationsHomeSubClusterResponse .newInstance(appsHomeSubClusters); @@ -772,11 +764,9 @@ public DeleteApplicationHomeSubClusterResponse deleteApplicationHomeSubCluster( FederationApplicationHomeSubClusterStoreInputValidator.validate(request); CallableStatement cstmt = null; - Connection conn = null; try { - conn = getConnection(); - cstmt = conn.prepareCall(CALL_SP_DELETE_APPLICATION_HOME_SUBCLUSTER); + cstmt = getCallableStatement(CALL_SP_DELETE_APPLICATION_HOME_SUBCLUSTER); // Set the parameters for the stored procedure cstmt.setString(1, request.getApplicationId().toString()); @@ -812,8 +802,8 @@ public DeleteApplicationHomeSubClusterResponse deleteApplicationHomeSubCluster( FederationStateStoreUtils.logAndThrowRetriableException(LOG, "Unable to delete the application " + request.getApplicationId(), e); } finally { - // Return to the pool the CallableStatement and the Connection - FederationStateStoreUtils.returnToPool(LOG, cstmt, conn); + // Return to the pool the CallableStatement + FederationStateStoreUtils.returnToPool(LOG, cstmt); } return DeleteApplicationHomeSubClusterResponse.newInstance(); } @@ -826,12 +816,10 @@ public GetSubClusterPolicyConfigurationResponse getPolicyConfiguration( FederationPolicyStoreInputValidator.validate(request); CallableStatement cstmt = null; - Connection conn = null; SubClusterPolicyConfiguration subClusterPolicyConfiguration = null; try { - conn = getConnection(); - cstmt = conn.prepareCall(CALL_SP_GET_POLICY_CONFIGURATION); + cstmt = getCallableStatement(CALL_SP_GET_POLICY_CONFIGURATION); // Set the parameters for the stored procedure cstmt.setString(1, request.getQueue()); @@ -864,8 +852,8 @@ public GetSubClusterPolicyConfigurationResponse getPolicyConfiguration( "Unable to select the policy for the queue :" + request.getQueue(), e); } finally { - // Return to the pool the CallableStatement and the Connection - FederationStateStoreUtils.returnToPool(LOG, cstmt, conn); + // Return to the pool the CallableStatement + FederationStateStoreUtils.returnToPool(LOG, cstmt); } return GetSubClusterPolicyConfigurationResponse .newInstance(subClusterPolicyConfiguration); @@ -879,13 +867,11 @@ public SetSubClusterPolicyConfigurationResponse setPolicyConfiguration( FederationPolicyStoreInputValidator.validate(request); CallableStatement cstmt = null; - Connection conn = null; SubClusterPolicyConfiguration policyConf = request.getPolicyConfiguration(); try { - conn = getConnection(); - cstmt = conn.prepareCall(CALL_SP_SET_POLICY_CONFIGURATION); + cstmt = getCallableStatement(CALL_SP_SET_POLICY_CONFIGURATION); // Set the parameters for the stored procedure cstmt.setString(1, policyConf.getQueue()); @@ -925,8 +911,8 @@ public SetSubClusterPolicyConfigurationResponse setPolicyConfiguration( + policyConf.getQueue(), e); } finally { - // Return to the pool the CallableStatement and the Connection - FederationStateStoreUtils.returnToPool(LOG, cstmt, conn); + // Return to the pool the CallableStatement + FederationStateStoreUtils.returnToPool(LOG, cstmt); } return SetSubClusterPolicyConfigurationResponse.newInstance(); } @@ -936,14 +922,12 @@ public GetSubClusterPoliciesConfigurationsResponse getPoliciesConfigurations( GetSubClusterPoliciesConfigurationsRequest request) throws YarnException { CallableStatement cstmt = null; - Connection conn = null; ResultSet rs = null; List policyConfigurations = new ArrayList(); try { - conn = getConnection(); - cstmt = conn.prepareCall(CALL_SP_GET_POLICIES_CONFIGURATIONS); + cstmt = getCallableStatement(CALL_SP_GET_POLICIES_CONFIGURATIONS); // Execute the query long startTime = clock.getTime(); @@ -971,8 +955,8 @@ public GetSubClusterPoliciesConfigurationsResponse getPoliciesConfigurations( FederationStateStoreUtils.logAndThrowRetriableException(LOG, "Unable to obtain the policy information for all the queues.", e); } finally { - // Return to the pool the CallableStatement and the Connection - FederationStateStoreUtils.returnToPool(LOG, cstmt, conn, rs); + // Return to the pool the CallableStatement + FederationStateStoreUtils.returnToPool(LOG, cstmt, null, rs); } return GetSubClusterPoliciesConfigurationsResponse @@ -993,6 +977,8 @@ public Version loadVersion() { public void close() throws Exception { if (dataSource != null) { dataSource.close(); + LOG.debug("Connection closed"); + FederationStateStoreClientMetrics.decrConnections(); } } @@ -1003,9 +989,15 @@ public void close() throws Exception { * @throws SQLException on failure */ public Connection getConnection() throws SQLException { + FederationStateStoreClientMetrics.incrConnections(); return dataSource.getConnection(); } + private CallableStatement getCallableStatement(String procedure) + throws SQLException { + return conn.prepareCall(procedure); + } + private static byte[] getByteArray(ByteBuffer bb) { byte[] ba = new byte[bb.limit()]; bb.get(ba); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/metrics/FederationStateStoreClientMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/metrics/FederationStateStoreClientMetrics.java index 27b46cde8e..d04f850b5d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/metrics/FederationStateStoreClientMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/metrics/FederationStateStoreClientMetrics.java @@ -31,6 +31,7 @@ import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.lib.MetricsRegistry; import org.apache.hadoop.metrics2.lib.MutableCounterLong; +import org.apache.hadoop.metrics2.lib.MutableGaugeInt; import org.apache.hadoop.metrics2.lib.MutableQuantiles; import org.apache.hadoop.metrics2.lib.MutableRate; import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; @@ -80,6 +81,9 @@ public final class FederationStateStoreClientMetrics implements MetricsSource { @Metric("Total number of failed StateStore calls") private static MutableCounterLong totalFailedCalls; + @Metric("Total number of Connections") + private static MutableGaugeInt totalConnections; + // This after the static members are initialized, or the constructor will // throw a NullPointerException private static final FederationStateStoreClientMetrics S_INSTANCE = @@ -146,6 +150,14 @@ public static void succeededStateStoreCall(long duration) { methodQuantileMetric.add(duration); } + public static void incrConnections() { + totalConnections.incr(); + } + + public static void decrConnections() { + totalConnections.decr(); + } + @Override public void getMetrics(MetricsCollector collector, boolean all) { REGISTRY.snapshot(collector.addRecord(REGISTRY.info()), all); @@ -181,4 +193,10 @@ static long getNumSucceededCalls() { static double getLatencySucceededCalls() { return totalSucceededCalls.lastStat().mean(); } + + @VisibleForTesting + public static int getNumConnections() { + return totalConnections.value(); + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreUtils.java index 3b870debef..27a4f7dba5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreUtils.java @@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreException; import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreInvalidInputException; import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreRetriableException; +import org.apache.hadoop.yarn.server.federation.store.metrics.FederationStateStoreClientMetrics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,6 +70,7 @@ public static void returnToPool(Logger log, CallableStatement cstmt, if (conn != null) { try { conn.close(); + FederationStateStoreClientMetrics.decrConnections(); } catch (SQLException e) { logAndThrowException(log, "Exception while trying to close Connection", e); @@ -98,6 +100,18 @@ public static void returnToPool(Logger log, CallableStatement cstmt, returnToPool(log, cstmt, conn, null); } + /** + * Returns the SQL FederationStateStore connections to the pool. + * + * @param log the logger interface + * @param cstmt the interface used to execute SQL stored procedures + * @throws YarnException on failure + */ + public static void returnToPool(Logger log, CallableStatement cstmt) + throws YarnException { + returnToPool(log, cstmt, null); + } + /** * Throws an exception due to an error in FederationStateStore. * diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java index b17f8702a9..d0e6485b02 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java @@ -68,7 +68,7 @@ public abstract class FederationStateStoreBaseTest { private static final MonotonicClock CLOCK = new MonotonicClock(); - private FederationStateStore stateStore = createStateStore(); + private FederationStateStore stateStore; protected abstract FederationStateStore createStateStore(); @@ -76,6 +76,7 @@ public abstract class FederationStateStoreBaseTest { @Before public void before() throws IOException, YarnException { + stateStore = createStateStore(); stateStore.init(conf); } @@ -516,7 +517,7 @@ public void testGetPoliciesConfigurations() throws Exception { // Convenience methods - private SubClusterInfo createSubClusterInfo(SubClusterId subClusterId) { + SubClusterInfo createSubClusterInfo(SubClusterId subClusterId) { String amRMAddress = "1.2.3.4:1"; String clientRMAddress = "1.2.3.4:2"; @@ -535,7 +536,7 @@ private SubClusterPolicyConfiguration createSCPolicyConf(String queueName, return SubClusterPolicyConfiguration.newInstance(queueName, policyType, bb); } - private void addApplicationHomeSC(ApplicationId appId, + void addApplicationHomeSC(ApplicationId appId, SubClusterId subClusterId) throws YarnException { ApplicationHomeSubCluster ahsc = ApplicationHomeSubCluster.newInstance(appId, subClusterId); @@ -558,14 +559,14 @@ private void registerSubCluster(SubClusterInfo subClusterInfo) SubClusterRegisterRequest.newInstance(subClusterInfo)); } - private SubClusterInfo querySubClusterInfo(SubClusterId subClusterId) + SubClusterInfo querySubClusterInfo(SubClusterId subClusterId) throws YarnException { GetSubClusterInfoRequest request = GetSubClusterInfoRequest.newInstance(subClusterId); return stateStore.getSubCluster(request).getSubClusterInfo(); } - private SubClusterId queryApplicationHomeSC(ApplicationId appId) + SubClusterId queryApplicationHomeSC(ApplicationId appId) throws YarnException { GetApplicationHomeSubClusterRequest request = GetApplicationHomeSubClusterRequest.newInstance(appId); @@ -594,4 +595,8 @@ protected Configuration getConf() { return conf; } + protected FederationStateStore getStateStore() { + return stateStore; + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/HSQLDBFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/HSQLDBFederationStateStore.java index 289a3a6112..c3d0a9e1bb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/HSQLDBFederationStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/HSQLDBFederationStateStore.java @@ -209,7 +209,7 @@ public void init(Configuration conf) { LOG.error("ERROR: failed to init HSQLDB " + e1.getMessage()); } try { - conn = getConnection(); + conn = super.conn; LOG.info("Database Init: Start"); @@ -234,7 +234,6 @@ public void init(Configuration conf) { conn.prepareStatement(SP_GETPOLICIESCONFIGURATIONS).execute(); LOG.info("Database Init: Complete"); - conn.close(); } catch (SQLException e) { LOG.error("ERROR: failed to inizialize HSQLDB " + e.getMessage()); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestSQLFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestSQLFederationStateStore.java index d4e6cc53f6..3c1d327b39 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestSQLFederationStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestSQLFederationStateStore.java @@ -17,8 +17,16 @@ package org.apache.hadoop.yarn.server.federation.store.impl; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; +import org.apache.hadoop.yarn.server.federation.store.metrics.FederationStateStoreClientMetrics; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest; +import org.junit.Assert; +import org.junit.Test; /** * Unit tests for SQLFederationStateStore. @@ -46,4 +54,24 @@ protected FederationStateStore createStateStore() { super.setConf(conf); return new HSQLDBFederationStateStore(); } + + @Test + public void testSqlConnectionsCreatedCount() throws YarnException { + FederationStateStore stateStore = getStateStore(); + SubClusterId subClusterId = SubClusterId.newInstance("SC"); + ApplicationId appId = ApplicationId.newInstance(1, 1); + + SubClusterInfo subClusterInfo = createSubClusterInfo(subClusterId); + + stateStore.registerSubCluster( + SubClusterRegisterRequest.newInstance(subClusterInfo)); + Assert.assertEquals(subClusterInfo, querySubClusterInfo(subClusterId)); + + addApplicationHomeSC(appId, subClusterId); + Assert.assertEquals(subClusterId, queryApplicationHomeSC(appId)); + + // Verify if connection is created only once at statestore init + Assert.assertEquals(1, + FederationStateStoreClientMetrics.getNumConnections()); + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestZookeeperFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestZookeeperFederationStateStore.java index 390b8037b1..fe28641eb2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestZookeeperFederationStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestZookeeperFederationStateStore.java @@ -72,7 +72,6 @@ public void before() throws IOException, YarnException { @After public void after() throws Exception { super.after(); - curatorFramework.close(); try { curatorTestingServer.stop(); @@ -82,8 +81,7 @@ public void after() throws Exception { @Override protected FederationStateStore createStateStore() { - Configuration conf = new Configuration(); - super.setConf(conf); + super.setConf(getConf()); return new ZookeeperFederationStateStore(); } } \ No newline at end of file