From 48b6f9f33577cbf28d89f1c29376fc58e756cdfd Mon Sep 17 00:00:00 2001 From: slfan1989 <55643692+slfan1989@users.noreply.github.com> Date: Thu, 20 Oct 2022 07:11:28 +0800 Subject: [PATCH] YARN-11328. Refactoring part of the code of SQLFederationStateStore. (#4976) --- .../FederationStateStoreStoredProcs.sql | 242 ++++----- .../store/impl/SQLFederationStateStore.java | 463 ++++++++---------- .../utils/FederationStateStoreUtils.java | 23 + .../impl/HSQLDBFederationStateStore.java | 4 +- .../impl/TestSQLFederationStateStore.java | 6 +- 5 files changed, 351 insertions(+), 387 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/SQLServer/FederationStateStoreStoredProcs.sql b/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/SQLServer/FederationStateStoreStoredProcs.sql index 17f9e96909..cc8a79d627 100644 --- a/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/SQLServer/FederationStateStoreStoredProcs.sql +++ b/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/SQLServer/FederationStateStoreStoredProcs.sql @@ -24,10 +24,10 @@ IF OBJECT_ID ( '[sp_addApplicationHomeSubCluster]', 'P' ) IS NOT NULL GO CREATE PROCEDURE [dbo].[sp_addApplicationHomeSubCluster] - @applicationId VARCHAR(64), - @homeSubCluster VARCHAR(256), - @storedHomeSubCluster VARCHAR(256) OUTPUT, - @rowCount int OUTPUT + @applicationId_IN VARCHAR(64), + @homeSubCluster_IN VARCHAR(256), + @storedHomeSubCluster_OUT VARCHAR(256) OUTPUT, + @rowCount_OUT int OUTPUT AS BEGIN DECLARE @errorMessage nvarchar(4000) @@ -37,21 +37,21 @@ AS BEGIN -- Otherwise don't change the current mapping. IF NOT EXISTS (SELECT TOP 1 * FROM [dbo].[applicationsHomeSubCluster] - WHERE [applicationId] = @applicationId) + WHERE [applicationId] = @applicationId_IN) INSERT INTO [dbo].[applicationsHomeSubCluster] ( [applicationId], [homeSubCluster]) VALUES ( - @applicationId, - @homeSubCluster); + @applicationId_IN, + @homeSubCluster_IN); -- End of the IF block - SELECT @rowCount = @@ROWCOUNT; + SELECT @rowCount_OUT = @@ROWCOUNT; - SELECT @storedHomeSubCluster = [homeSubCluster] + SELECT @storedHomeSubCluster_OUT = [homeSubCluster] FROM [dbo].[applicationsHomeSubCluster] - WHERE [applicationId] = @applicationId; + WHERE [applicationId] = @applicationId_IN; COMMIT TRAN END TRY @@ -75,9 +75,9 @@ IF OBJECT_ID ( '[sp_updateApplicationHomeSubCluster]', 'P' ) IS NOT NULL GO CREATE PROCEDURE [dbo].[sp_updateApplicationHomeSubCluster] - @applicationId VARCHAR(64), - @homeSubCluster VARCHAR(256), - @rowCount int OUTPUT + @applicationId_IN VARCHAR(64), + @homeSubCluster_IN VARCHAR(256), + @rowCount_OUT int OUTPUT AS BEGIN DECLARE @errorMessage nvarchar(4000) @@ -85,9 +85,9 @@ AS BEGIN BEGIN TRAN UPDATE [dbo].[applicationsHomeSubCluster] - SET [homeSubCluster] = @homeSubCluster - WHERE [applicationId] = @applicationid; - SELECT @rowCount = @@ROWCOUNT; + SET [homeSubCluster] = @homeSubCluster_IN + WHERE [applicationId] = @applicationId_IN; + SELECT @rowCount_OUT = @@ROWCOUNT; COMMIT TRAN END TRY @@ -111,8 +111,8 @@ IF OBJECT_ID ( '[sp_getApplicationsHomeSubCluster]', 'P' ) IS NOT NULL GO CREATE PROCEDURE [dbo].[sp_getApplicationsHomeSubCluster] - @limit int, - @homeSubCluster VARCHAR(256) + @limit_IN int, + @homeSubCluster_IN VARCHAR(256) AS BEGIN DECLARE @errorMessage nvarchar(4000) @@ -128,8 +128,8 @@ AS BEGIN [createTime], row_number() over(order by [createTime] desc) AS app_rank FROM [dbo].[applicationsHomeSubCluster] - WHERE [homeSubCluster] = @homeSubCluster OR @homeSubCluster = '') AS applicationsHomeSubCluster - WHERE app_rank <= @limit; + WHERE [homeSubCluster] = @homeSubCluster_IN OR @homeSubCluster = '') AS applicationsHomeSubCluster + WHERE app_rank <= @limit_IN; END TRY @@ -150,16 +150,16 @@ IF OBJECT_ID ( '[sp_getApplicationHomeSubCluster]', 'P' ) IS NOT NULL GO CREATE PROCEDURE [dbo].[sp_getApplicationHomeSubCluster] - @applicationId VARCHAR(64), - @homeSubCluster VARCHAR(256) OUTPUT + @applicationId_IN VARCHAR(64), + @homeSubCluster_OUT VARCHAR(256) OUTPUT AS BEGIN DECLARE @errorMessage nvarchar(4000) BEGIN TRY - SELECT @homeSubCluster = [homeSubCluster] + SELECT @homeSubCluster_OUT = [homeSubCluster] FROM [dbo].[applicationsHomeSubCluster] - WHERE [applicationId] = @applicationid; + WHERE [applicationId] = @applicationId_IN; END TRY @@ -181,8 +181,8 @@ IF OBJECT_ID ( '[sp_deleteApplicationHomeSubCluster]', 'P' ) IS NOT NULL GO CREATE PROCEDURE [dbo].[sp_deleteApplicationHomeSubCluster] - @applicationId VARCHAR(64), - @rowCount int OUTPUT + @applicationId_IN VARCHAR(64), + @rowCount_OUT int OUTPUT AS BEGIN DECLARE @errorMessage nvarchar(4000) @@ -190,8 +190,8 @@ AS BEGIN BEGIN TRAN DELETE FROM [dbo].[applicationsHomeSubCluster] - WHERE [applicationId] = @applicationId; - SELECT @rowCount = @@ROWCOUNT; + WHERE [applicationId] = @applicationId_IN; + SELECT @rowCount_OUT = @@ROWCOUNT; COMMIT TRAN END TRY @@ -215,15 +215,15 @@ IF OBJECT_ID ( '[sp_registerSubCluster]', 'P' ) IS NOT NULL GO CREATE PROCEDURE [dbo].[sp_registerSubCluster] - @subClusterId VARCHAR(256), - @amRMServiceAddress VARCHAR(256), - @clientRMServiceAddress VARCHAR(256), - @rmAdminServiceAddress VARCHAR(256), - @rmWebServiceAddress VARCHAR(256), - @state VARCHAR(32), - @lastStartTime BIGINT, - @capability VARCHAR(6000), - @rowCount int OUTPUT + @subClusterId_IN VARCHAR(256), + @amRMServiceAddress_IN VARCHAR(256), + @clientRMServiceAddress_IN VARCHAR(256), + @rmAdminServiceAddress_IN VARCHAR(256), + @rmWebServiceAddress_IN VARCHAR(256), + @state_IN VARCHAR(32), + @lastStartTime_IN BIGINT, + @capability_IN VARCHAR(6000), + @rowCount_OUT int OUTPUT AS BEGIN DECLARE @errorMessage nvarchar(4000) @@ -231,7 +231,7 @@ AS BEGIN BEGIN TRAN DELETE FROM [dbo].[membership] - WHERE [subClusterId] = @subClusterId; + WHERE [subClusterId] = @subClusterId_IN; INSERT INTO [dbo].[membership] ( [subClusterId], [amRMServiceAddress], @@ -243,16 +243,16 @@ AS BEGIN [lastStartTime], [capability] ) VALUES ( - @subClusterId, - @amRMServiceAddress, - @clientRMServiceAddress, - @rmAdminServiceAddress, - @rmWebServiceAddress, + @subClusterId_IN, + @amRMServiceAddress_IN, + @clientRMServiceAddress_IN, + @rmAdminServiceAddress_IN, + @rmWebServiceAddress_IN, GETUTCDATE(), - @state, - @lastStartTime, - @capability); - SELECT @rowCount = @@ROWCOUNT; + @state_IN, + @lastStartTime_IN, + @capability_IN); + SELECT @rowCount_OUT = @@ROWCOUNT; COMMIT TRAN END TRY @@ -303,32 +303,32 @@ IF OBJECT_ID ( '[sp_getSubCluster]', 'P' ) IS NOT NULL GO CREATE PROCEDURE [dbo].[sp_getSubCluster] - @subClusterId VARCHAR(256), - @amRMServiceAddress VARCHAR(256) OUTPUT, - @clientRMServiceAddress VARCHAR(256) OUTPUT, - @rmAdminServiceAddress VARCHAR(256) OUTPUT, - @rmWebServiceAddress VARCHAR(256) OUTPUT, - @lastHeartbeat DATETIME2 OUTPUT, - @state VARCHAR(256) OUTPUT, - @lastStartTime BIGINT OUTPUT, - @capability VARCHAR(6000) OUTPUT + @subClusterId_IN VARCHAR(256), + @amRMServiceAddress_OUT VARCHAR(256) OUTPUT, + @clientRMServiceAddress_OUT VARCHAR(256) OUTPUT, + @rmAdminServiceAddress_OUT VARCHAR(256) OUTPUT, + @rmWebServiceAddress_OUT VARCHAR(256) OUTPUT, + @lastHeartBeat_OUT DATETIME2 OUTPUT, + @state_OUT VARCHAR(256) OUTPUT, + @lastStartTime_OUT BIGINT OUTPUT, + @capability_OUT VARCHAR(6000) OUTPUT AS BEGIN DECLARE @errorMessage nvarchar(4000) BEGIN TRY BEGIN TRAN - SELECT @subClusterId = [subClusterId], - @amRMServiceAddress = [amRMServiceAddress], - @clientRMServiceAddress = [clientRMServiceAddress], - @rmAdminServiceAddress = [rmAdminServiceAddress], - @rmWebServiceAddress = [rmWebServiceAddress], - @lastHeartBeat = [lastHeartBeat], - @state = [state], - @lastStartTime = [lastStartTime], - @capability = [capability] + SELECT @subClusterId_IN = [subClusterId], + @amRMServiceAddress_OUT = [amRMServiceAddress], + @clientRMServiceAddress_OUT = [clientRMServiceAddress], + @rmAdminServiceAddress_OUT = [rmAdminServiceAddress], + @rmWebServiceAddress_OUT = [rmWebServiceAddress], + @lastHeartBeat_OUT = [lastHeartBeat], + @state_OUT = [state], + @lastStartTime_OUT = [lastStartTime], + @capability_OUT = [capability] FROM [dbo].[membership] - WHERE [subClusterId] = @subClusterId + WHERE [subClusterId] = @subClusterId_IN COMMIT TRAN END TRY @@ -353,10 +353,10 @@ IF OBJECT_ID ( '[sp_subClusterHeartbeat]', 'P' ) IS NOT NULL GO CREATE PROCEDURE [dbo].[sp_subClusterHeartbeat] - @subClusterId VARCHAR(256), - @state VARCHAR(256), - @capability VARCHAR(6000), - @rowCount int OUTPUT + @subClusterId_IN VARCHAR(256), + @state_IN VARCHAR(256), + @capability_IN VARCHAR(6000), + @rowCount_OUT int OUTPUT AS BEGIN DECLARE @errorMessage nvarchar(4000) @@ -364,11 +364,11 @@ AS BEGIN BEGIN TRAN UPDATE [dbo].[membership] - SET [state] = @state, + SET [state] = @state_IN, [lastHeartbeat] = GETUTCDATE(), - [capability] = @capability - WHERE [subClusterId] = @subClusterId; - SELECT @rowCount = @@ROWCOUNT; + [capability] = @capability_IN + WHERE [subClusterId] = @subClusterId_IN; + SELECT @rowCount_OUT = @@ROWCOUNT; COMMIT TRAN END TRY @@ -392,9 +392,9 @@ IF OBJECT_ID ( '[sp_deregisterSubCluster]', 'P' ) IS NOT NULL GO CREATE PROCEDURE [dbo].[sp_deregisterSubCluster] - @subClusterId VARCHAR(256), - @state VARCHAR(256), - @rowCount int OUTPUT + @subClusterId_IN VARCHAR(256), + @state_IN VARCHAR(256), + @rowCount_OUT int OUTPUT AS BEGIN DECLARE @errorMessage nvarchar(4000) @@ -402,9 +402,9 @@ AS BEGIN BEGIN TRAN UPDATE [dbo].[membership] - SET [state] = @state - WHERE [subClusterId] = @subClusterId; - SELECT @rowCount = @@ROWCOUNT; + SET [state] = @state_IN + WHERE [subClusterId] = @subClusterId_IN; + SELECT @rowCount_OUT = @@ROWCOUNT; COMMIT TRAN END TRY @@ -428,10 +428,10 @@ IF OBJECT_ID ( '[sp_setPolicyConfiguration]', 'P' ) IS NOT NULL GO CREATE PROCEDURE [dbo].[sp_setPolicyConfiguration] - @queue VARCHAR(256), - @policyType VARCHAR(256), - @params VARBINARY(512), - @rowCount int OUTPUT + @queue_IN VARCHAR(256), + @policyType_IN VARCHAR(256), + @params_IN VARBINARY(512), + @rowCount_OUT int OUTPUT AS BEGIN DECLARE @errorMessage nvarchar(4000) @@ -439,16 +439,16 @@ AS BEGIN BEGIN TRAN DELETE FROM [dbo].[policies] - WHERE [queue] = @queue; + WHERE [queue] = @queue_IN; INSERT INTO [dbo].[policies] ( [queue], [policyType], [params]) VALUES ( - @queue, - @policyType, - @params); - SELECT @rowCount = @@ROWCOUNT; + @queue_IN, + @policyType_IN, + @params_IN); + SELECT @rowCount_OUT = @@ROWCOUNT; COMMIT TRAN END TRY @@ -472,18 +472,18 @@ IF OBJECT_ID ( '[sp_getPolicyConfiguration]', 'P' ) IS NOT NULL GO CREATE PROCEDURE [dbo].[sp_getPolicyConfiguration] - @queue VARCHAR(256), - @policyType VARCHAR(256) OUTPUT, - @params VARBINARY(6000) OUTPUT + @queue_IN VARCHAR(256), + @policyType_OUT VARCHAR(256) OUTPUT, + @params_OUT VARBINARY(6000) OUTPUT AS BEGIN DECLARE @errorMessage nvarchar(4000) BEGIN TRY - SELECT @policyType = [policyType], - @params = [params] + SELECT @policyType_OUT = [policyType], + @params_OUT = [params] FROM [dbo].[policies] - WHERE [queue] = @queue + WHERE [queue] = @queue_IN END TRY @@ -524,15 +524,15 @@ AS BEGIN END; GO -IF OBJECT_ID ( '[sp_addApplicationHomeSubCluster]', 'P' ) IS NOT NULL - DROP PROCEDURE [sp_addApplicationHomeSubCluster]; +IF OBJECT_ID ( '[sp_addReservationHomeSubCluster]', 'P' ) IS NOT NULL + DROP PROCEDURE [sp_addReservationHomeSubCluster]; GO CREATE PROCEDURE [dbo].[sp_addReservationHomeSubCluster] - @reservationId VARCHAR(128), - @homeSubCluster VARCHAR(256), - @storedHomeSubCluster VARCHAR(256) OUTPUT, - @rowCount int OUTPUT + @reservationId_IN VARCHAR(128), + @homeSubCluster_IN VARCHAR(256), + @storedHomeSubCluster_OUT VARCHAR(256) OUTPUT, + @rowCount_OUT int OUTPUT AS BEGIN DECLARE @errorMessage nvarchar(4000) @@ -542,21 +542,21 @@ AS BEGIN -- Otherwise don't change the current mapping. IF NOT EXISTS (SELECT TOP 1 * FROM [dbo].[reservationsHomeSubCluster] - WHERE [reservationId] = @reservationId) + WHERE [reservationId] = @reservationId_IN) INSERT INTO [dbo].[reservationsHomeSubCluster] ( [reservationId], [homeSubCluster]) VALUES ( - @reservationId, - @homeSubCluster); + @reservationId_IN, + @homeSubCluster_IN); -- End of the IF block - SELECT @rowCount = @@ROWCOUNT; + SELECT @rowCount_OUT = @@ROWCOUNT; - SELECT @storedHomeSubCluster = [homeSubCluster] + SELECT @storedHomeSubCluster_OUT = [homeSubCluster] FROM [dbo].[reservationsHomeSubCluster] - WHERE [reservationId] = @reservationId; + WHERE [reservationId] = @reservationId_IN; COMMIT TRAN END TRY @@ -580,9 +580,9 @@ IF OBJECT_ID ( '[sp_updateReservationHomeSubCluster]', 'P' ) IS NOT NULL GO CREATE PROCEDURE [dbo].[sp_updateReservationHomeSubCluster] - @reservationId VARCHAR(128), - @homeSubCluster VARCHAR(256), - @rowCount int OUTPUT + @reservationId_IN VARCHAR(128), + @homeSubCluster_IN VARCHAR(256), + @rowCount_OUT int OUTPUT AS BEGIN DECLARE @errorMessage nvarchar(4000) @@ -590,9 +590,9 @@ AS BEGIN BEGIN TRAN UPDATE [dbo].[reservationsHomeSubCluster] - SET [homeSubCluster] = @homeSubCluster - WHERE [reservationId] = @reservationId; - SELECT @rowCount = @@ROWCOUNT; + SET [homeSubCluster] = @homeSubCluster_IN + WHERE [reservationId] = @reservationId_IN; + SELECT @rowCount_OUT = @@ROWCOUNT; COMMIT TRAN END TRY @@ -641,16 +641,16 @@ IF OBJECT_ID ( '[sp_getReservationHomeSubCluster]', 'P' ) IS NOT NULL GO CREATE PROCEDURE [dbo].[sp_getReservationHomeSubCluster] - @reservationId VARCHAR(128), - @homeSubCluster VARCHAR(256) OUTPUT + @reservationId_IN VARCHAR(128), + @homeSubCluster_OUT VARCHAR(256) OUTPUT AS BEGIN DECLARE @errorMessage nvarchar(4000) BEGIN TRY - SELECT @homeSubCluster = [homeSubCluster] + SELECT @homeSubCluster_OUT = [homeSubCluster] FROM [dbo].[reservationsHomeSubCluster] - WHERE [reservationId] = @reservationId; + WHERE [reservationId] = @reservationId_IN; END TRY @@ -672,8 +672,8 @@ IF OBJECT_ID ( '[sp_deleteReservationHomeSubCluster]', 'P' ) IS NOT NULL GO CREATE PROCEDURE [dbo].[sp_deleteReservationHomeSubCluster] - @reservationId VARCHAR(128), - @rowCount int OUTPUT + @reservationId_IN VARCHAR(128), + @rowCount_OUT int OUTPUT AS BEGIN DECLARE @errorMessage nvarchar(4000) @@ -681,8 +681,8 @@ AS BEGIN BEGIN TRAN DELETE FROM [dbo].[reservationsHomeSubCluster] - WHERE [reservationId] = @reservationId; - SELECT @rowCount = @@ROWCOUNT; + WHERE [reservationId] = @reservationId_IN; + SELECT @rowCount_OUT = @@ROWCOUNT; COMMIT TRAN END TRY diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java index d7b5d27324..2bf2658944 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java @@ -177,7 +177,7 @@ public class SQLFederationStateStore implements FederationStateStore { private HikariDataSource dataSource = null; private final Clock clock = new MonotonicClock(); @VisibleForTesting - Connection conn = null; + private Connection conn = null; private int maxAppsInStateStore; @Override @@ -197,8 +197,7 @@ public void init(Configuration conf) throws YarnException { try { Class.forName(driverClass); } catch (ClassNotFoundException e) { - FederationStateStoreUtils.logAndThrowException(LOG, - "Driver class not found.", e); + FederationStateStoreUtils.logAndThrowException(LOG, "Driver class not found.", e); } // Create the data source to pool connections in a thread-safe manner @@ -209,14 +208,14 @@ public void init(Configuration conf) throws YarnException { FederationStateStoreUtils.setProperty(dataSource, FederationStateStoreUtils.FEDERATION_STORE_URL, url); dataSource.setMaximumPoolSize(maximumPoolSize); - LOG.info("Initialized connection pool to the Federation StateStore " - + "database at address: " + url); + LOG.info("Initialized connection pool to the Federation StateStore database at address: {}.", + url); + try { conn = getConnection(); LOG.debug("Connection created"); } catch (SQLException e) { - FederationStateStoreUtils.logAndThrowRetriableException(LOG, - "Not able to get Connection", e); + FederationStateStoreUtils.logAndThrowRetriableException(LOG, "Not able to get Connection", e); } maxAppsInStateStore = conf.getInt( @@ -226,32 +225,29 @@ public void init(Configuration conf) throws YarnException { @Override public SubClusterRegisterResponse registerSubCluster( - SubClusterRegisterRequest registerSubClusterRequest) - throws YarnException { + SubClusterRegisterRequest registerSubClusterRequest) throws YarnException { // Input validator - FederationMembershipStateStoreInputValidator - .validate(registerSubClusterRequest); + FederationMembershipStateStoreInputValidator.validate(registerSubClusterRequest); CallableStatement cstmt = null; - SubClusterInfo subClusterInfo = - registerSubClusterRequest.getSubClusterInfo(); + SubClusterInfo subClusterInfo = registerSubClusterRequest.getSubClusterInfo(); SubClusterId subClusterId = subClusterInfo.getSubClusterId(); try { cstmt = getCallableStatement(CALL_SP_REGISTER_SUBCLUSTER); // Set the parameters for the stored procedure - cstmt.setString(1, subClusterId.getId()); - cstmt.setString(2, subClusterInfo.getAMRMServiceAddress()); - cstmt.setString(3, subClusterInfo.getClientRMServiceAddress()); - cstmt.setString(4, subClusterInfo.getRMAdminServiceAddress()); - cstmt.setString(5, subClusterInfo.getRMWebServiceAddress()); - cstmt.setString(6, subClusterInfo.getState().toString()); - cstmt.setLong(7, subClusterInfo.getLastStartTime()); - cstmt.setString(8, subClusterInfo.getCapability()); - cstmt.registerOutParameter(9, java.sql.Types.INTEGER); + cstmt.setString("subClusterId_IN", subClusterId.getId()); + cstmt.setString("amRMServiceAddress_IN", subClusterInfo.getAMRMServiceAddress()); + cstmt.setString("clientRMServiceAddress_IN", subClusterInfo.getClientRMServiceAddress()); + cstmt.setString("rmAdminServiceAddress_IN", subClusterInfo.getRMAdminServiceAddress()); + cstmt.setString("rmWebServiceAddress_IN", subClusterInfo.getRMWebServiceAddress()); + cstmt.setString("state_IN", subClusterInfo.getState().toString()); + cstmt.setLong("lastStartTime_IN", subClusterInfo.getLastStartTime()); + cstmt.setString("capability_IN", subClusterInfo.getCapability()); + cstmt.registerOutParameter("rowCount_OUT", java.sql.Types.INTEGER); // Execute the query long startTime = clock.getTime(); @@ -260,30 +256,26 @@ public SubClusterRegisterResponse registerSubCluster( // Check the ROWCOUNT value, if it is equal to 0 it means the call // did not add a new subcluster into FederationStateStore - if (cstmt.getInt(9) == 0) { - String errMsg = "SubCluster " + subClusterId - + " was not registered into the StateStore"; - FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + int rowCount = cstmt.getInt("rowCount_OUT"); + if (rowCount == 0) { + FederationStateStoreUtils.logAndThrowStoreException(LOG, + "SubCluster %s was not registered into the StateStore.", subClusterId); } // Check the ROWCOUNT value, if it is different from 1 it means the call // had a wrong behavior. Maybe the database is not set correctly. - if (cstmt.getInt(9) != 1) { - String errMsg = "Wrong behavior during registration of SubCluster " - + subClusterId + " into the StateStore"; - FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + if (rowCount != 1) { + FederationStateStoreUtils.logAndThrowStoreException(LOG, + "Wrong behavior during registration of SubCluster %s into the StateStore", + subClusterId); } - LOG.info( - "Registered the SubCluster " + subClusterId + " into the StateStore"); - FederationStateStoreClientMetrics - .succeededStateStoreCall(stopTime - startTime); + LOG.info("Registered the SubCluster {} into the StateStore.", subClusterId); + FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime); } catch (SQLException e) { FederationStateStoreClientMetrics.failedStateStoreCall(); - FederationStateStoreUtils.logAndThrowRetriableException(LOG, - "Unable to register the SubCluster " + subClusterId - + " into the StateStore", - e); + FederationStateStoreUtils.logAndThrowRetriableException(e, + LOG, "Unable to register the SubCluster %s into the StateStore.", subClusterId); } finally { // Return to the pool the CallableStatement FederationStateStoreUtils.returnToPool(LOG, cstmt); @@ -294,12 +286,10 @@ public SubClusterRegisterResponse registerSubCluster( @Override public SubClusterDeregisterResponse deregisterSubCluster( - SubClusterDeregisterRequest subClusterDeregisterRequest) - throws YarnException { + SubClusterDeregisterRequest subClusterDeregisterRequest) throws YarnException { // Input validator - FederationMembershipStateStoreInputValidator - .validate(subClusterDeregisterRequest); + FederationMembershipStateStoreInputValidator.validate(subClusterDeregisterRequest); CallableStatement cstmt = null; @@ -310,9 +300,9 @@ public SubClusterDeregisterResponse deregisterSubCluster( cstmt = getCallableStatement(CALL_SP_DEREGISTER_SUBCLUSTER); // Set the parameters for the stored procedure - cstmt.setString(1, subClusterId.getId()); - cstmt.setString(2, state.toString()); - cstmt.registerOutParameter(3, java.sql.Types.INTEGER); + cstmt.setString("subClusterId_IN", subClusterId.getId()); + cstmt.setString("state_IN", state.toString()); + cstmt.registerOutParameter("rowCount_OUT", java.sql.Types.INTEGER); // Execute the query long startTime = clock.getTime(); @@ -321,29 +311,25 @@ public SubClusterDeregisterResponse deregisterSubCluster( // Check the ROWCOUNT value, if it is equal to 0 it means the call // did not deregister the subcluster into FederationStateStore - if (cstmt.getInt(3) == 0) { - String errMsg = "SubCluster " + subClusterId + " not found"; - FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + int rowCount = cstmt.getInt("rowCount_OUT"); + if (rowCount == 0) { + FederationStateStoreUtils.logAndThrowStoreException(LOG, + "SubCluster %s not found.", subClusterId); } // Check the ROWCOUNT value, if it is different from 1 it means the call // had a wrong behavior. Maybe the database is not set correctly. - if (cstmt.getInt(3) != 1) { - String errMsg = "Wrong behavior during deregistration of SubCluster " - + subClusterId + " from the StateStore"; - FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + if (rowCount != 1) { + FederationStateStoreUtils.logAndThrowStoreException(LOG, + "Wrong behavior during deregistration of SubCluster %s from the StateStore.", + subClusterId); } - - LOG.info("Deregistered the SubCluster " + subClusterId + " state to " - + state.toString()); - FederationStateStoreClientMetrics - .succeededStateStoreCall(stopTime - startTime); + LOG.info("Deregistered the SubCluster {} state to {}.", subClusterId, state.toString()); + FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime); } catch (SQLException e) { FederationStateStoreClientMetrics.failedStateStoreCall(); - FederationStateStoreUtils.logAndThrowRetriableException(LOG, - "Unable to deregister the sub-cluster " + subClusterId + " state to " - + state.toString(), - e); + FederationStateStoreUtils.logAndThrowRetriableException(e, LOG, + "Unable to deregister the sub-cluster %s state to %s.", subClusterId, state.toString()); } finally { // Return to the pool the CallableStatement FederationStateStoreUtils.returnToPool(LOG, cstmt); @@ -353,12 +339,10 @@ public SubClusterDeregisterResponse deregisterSubCluster( @Override public SubClusterHeartbeatResponse subClusterHeartbeat( - SubClusterHeartbeatRequest subClusterHeartbeatRequest) - throws YarnException { + SubClusterHeartbeatRequest subClusterHeartbeatRequest) throws YarnException { // Input validator - FederationMembershipStateStoreInputValidator - .validate(subClusterHeartbeatRequest); + FederationMembershipStateStoreInputValidator.validate(subClusterHeartbeatRequest); CallableStatement cstmt = null; @@ -369,10 +353,10 @@ public SubClusterHeartbeatResponse subClusterHeartbeat( cstmt = getCallableStatement(CALL_SP_SUBCLUSTER_HEARTBEAT); // Set the parameters for the stored procedure - cstmt.setString(1, subClusterId.getId()); - cstmt.setString(2, state.toString()); - cstmt.setString(3, subClusterHeartbeatRequest.getCapability()); - cstmt.registerOutParameter(4, java.sql.Types.INTEGER); + cstmt.setString("subClusterId_IN", subClusterId.getId()); + cstmt.setString("state_IN", state.toString()); + cstmt.setString("capability_IN", subClusterHeartbeatRequest.getCapability()); + cstmt.registerOutParameter("rowCount_OUT", java.sql.Types.INTEGER); // Execute the query long startTime = clock.getTime(); @@ -381,30 +365,25 @@ public SubClusterHeartbeatResponse subClusterHeartbeat( // Check the ROWCOUNT value, if it is equal to 0 it means the call // did not update the subcluster into FederationStateStore - if (cstmt.getInt(4) == 0) { - String errMsg = "SubCluster " + subClusterId.toString() - + " does not exist; cannot heartbeat"; - FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + int rowCount = cstmt.getInt("rowCount_OUT"); + if (rowCount == 0) { + FederationStateStoreUtils.logAndThrowStoreException(LOG, + "SubCluster %s does not exist; cannot heartbeat.", subClusterId); } // Check the ROWCOUNT value, if it is different from 1 it means the call // had a wrong behavior. Maybe the database is not set correctly. - if (cstmt.getInt(4) != 1) { - String errMsg = - "Wrong behavior during the heartbeat of SubCluster " + subClusterId; - FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + if (rowCount != 1) { + FederationStateStoreUtils.logAndThrowStoreException(LOG, + "Wrong behavior during the heartbeat of SubCluster %s.", subClusterId); } - LOG.info("Heartbeated the StateStore for the specified SubCluster " - + subClusterId); - FederationStateStoreClientMetrics - .succeededStateStoreCall(stopTime - startTime); + LOG.info("Heartbeated the StateStore for the specified SubCluster {}.", subClusterId); + FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime); } catch (SQLException e) { FederationStateStoreClientMetrics.failedStateStoreCall(); - FederationStateStoreUtils.logAndThrowRetriableException(LOG, - "Unable to heartbeat the StateStore for the specified SubCluster " - + subClusterId, - e); + FederationStateStoreUtils.logAndThrowRetriableException(e, LOG, + "Unable to heartbeat the StateStore for the specified SubCluster %s.", subClusterId); } finally { // Return to the pool the CallableStatement FederationStateStoreUtils.returnToPool(LOG, cstmt); @@ -426,27 +405,27 @@ public GetSubClusterInfoResponse getSubCluster( try { cstmt = getCallableStatement(CALL_SP_GET_SUBCLUSTER); - cstmt.setString(1, subClusterId.getId()); + cstmt.setString("subClusterId_IN", subClusterId.getId()); // Set the parameters for the stored procedure - cstmt.registerOutParameter(2, java.sql.Types.VARCHAR); - cstmt.registerOutParameter(3, java.sql.Types.VARCHAR); - cstmt.registerOutParameter(4, java.sql.Types.VARCHAR); - cstmt.registerOutParameter(5, java.sql.Types.VARCHAR); - cstmt.registerOutParameter(6, java.sql.Types.TIMESTAMP); - cstmt.registerOutParameter(7, java.sql.Types.VARCHAR); - cstmt.registerOutParameter(8, java.sql.Types.BIGINT); - cstmt.registerOutParameter(9, java.sql.Types.VARCHAR); + cstmt.registerOutParameter("amRMServiceAddress_OUT", java.sql.Types.VARCHAR); + cstmt.registerOutParameter("clientRMServiceAddress_OUT", java.sql.Types.VARCHAR); + cstmt.registerOutParameter("rmAdminServiceAddress_OUT", java.sql.Types.VARCHAR); + cstmt.registerOutParameter("rmWebServiceAddress_OUT", java.sql.Types.VARCHAR); + cstmt.registerOutParameter("lastHeartBeat_OUT", java.sql.Types.TIMESTAMP); + cstmt.registerOutParameter("state_OUT", java.sql.Types.VARCHAR); + cstmt.registerOutParameter("lastStartTime_OUT", java.sql.Types.BIGINT); + cstmt.registerOutParameter("capability_OUT", java.sql.Types.VARCHAR); // Execute the query long startTime = clock.getTime(); cstmt.execute(); long stopTime = clock.getTime(); - String amRMAddress = cstmt.getString(2); - String clientRMAddress = cstmt.getString(3); - String rmAdminAddress = cstmt.getString(4); - String webAppAddress = cstmt.getString(5); + String amRMAddress = cstmt.getString("amRMServiceAddress_OUT"); + String clientRMAddress = cstmt.getString("clientRMServiceAddress_OUT"); + String rmAdminAddress = cstmt.getString("rmAdminServiceAddress_OUT"); + String webAppAddress = cstmt.getString("rmWebServiceAddress_OUT"); // first check if the subCluster exists if((amRMAddress == null) || (clientRMAddress == null)) { @@ -454,36 +433,31 @@ public GetSubClusterInfoResponse getSubCluster( return null; } - Timestamp heartBeatTimeStamp = cstmt.getTimestamp(6, utcCalendar); - long lastHeartBeat = - heartBeatTimeStamp != null ? heartBeatTimeStamp.getTime() : 0; + Timestamp heartBeatTimeStamp = cstmt.getTimestamp("lastHeartBeat_OUT", utcCalendar); + long lastHeartBeat = heartBeatTimeStamp != null ? heartBeatTimeStamp.getTime() : 0; - SubClusterState state = SubClusterState.fromString(cstmt.getString(7)); - long lastStartTime = cstmt.getLong(8); - String capability = cstmt.getString(9); + SubClusterState state = SubClusterState.fromString(cstmt.getString("state_OUT")); + long lastStartTime = cstmt.getLong("lastStartTime_OUT"); + String capability = cstmt.getString("capability_OUT"); subClusterInfo = SubClusterInfo.newInstance(subClusterId, amRMAddress, clientRMAddress, rmAdminAddress, webAppAddress, lastHeartBeat, state, lastStartTime, capability); - FederationStateStoreClientMetrics - .succeededStateStoreCall(stopTime - startTime); + FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime); // Check if the output it is a valid subcluster try { - FederationMembershipStateStoreInputValidator - .checkSubClusterInfo(subClusterInfo); + FederationMembershipStateStoreInputValidator.checkSubClusterInfo(subClusterInfo); } catch (FederationStateStoreInvalidInputException e) { - String errMsg = - "SubCluster " + subClusterId.toString() + " does not exist"; - FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + FederationStateStoreUtils.logAndThrowStoreException(e, LOG, + "SubCluster %s does not exist.", subClusterId); } - LOG.debug("Got the information about the specified SubCluster {}", - subClusterInfo); + LOG.debug("Got the information about the specified SubCluster {}", subClusterInfo); } catch (SQLException e) { FederationStateStoreClientMetrics.failedStateStoreCall(); - FederationStateStoreUtils.logAndThrowRetriableException(LOG, - "Unable to obtain the SubCluster information for " + subClusterId, e); + FederationStateStoreUtils.logAndThrowRetriableException(e, LOG, + "Unable to obtain the SubCluster information for %s.", subClusterId); } finally { // Return to the pool the CallableStatement FederationStateStoreUtils.returnToPool(LOG, cstmt); @@ -496,7 +470,7 @@ public GetSubClustersInfoResponse getSubClusters( GetSubClustersInfoRequest subClustersRequest) throws YarnException { CallableStatement cstmt = null; ResultSet rs = null; - List subClusters = new ArrayList(); + List subClusters = new ArrayList<>(); try { cstmt = getCallableStatement(CALL_SP_GET_SUBCLUSTERS); @@ -509,15 +483,15 @@ public GetSubClustersInfoResponse getSubClusters( while (rs.next()) { // Extract the output for each tuple - String subClusterName = rs.getString(1); - String amRMAddress = rs.getString(2); - String clientRMAddress = rs.getString(3); - String rmAdminAddress = rs.getString(4); - String webAppAddress = rs.getString(5); - long lastHeartBeat = rs.getTimestamp(6, utcCalendar).getTime(); - SubClusterState state = SubClusterState.fromString(rs.getString(7)); - long lastStartTime = rs.getLong(8); - String capability = rs.getString(9); + String subClusterName = rs.getString("subClusterId"); + String amRMAddress = rs.getString("amRMServiceAddress"); + String clientRMAddress = rs.getString("clientRMServiceAddress"); + String rmAdminAddress = rs.getString("rmAdminServiceAddress"); + String webAppAddress = rs.getString("rmWebServiceAddress"); + long lastHeartBeat = rs.getTimestamp("lastHeartBeat", utcCalendar).getTime(); + SubClusterState state = SubClusterState.fromString(rs.getString("state")); + long lastStartTime = rs.getLong("lastStartTime"); + String capability = rs.getString("capability"); SubClusterId subClusterId = SubClusterId.newInstance(subClusterName); SubClusterInfo subClusterInfo = SubClusterInfo.newInstance(subClusterId, @@ -527,15 +501,12 @@ public GetSubClustersInfoResponse getSubClusters( FederationStateStoreClientMetrics .succeededStateStoreCall(stopTime - startTime); - // Check if the output it is a valid subcluster try { - FederationMembershipStateStoreInputValidator - .checkSubClusterInfo(subClusterInfo); + FederationMembershipStateStoreInputValidator.checkSubClusterInfo(subClusterInfo); } catch (FederationStateStoreInvalidInputException e) { - String errMsg = - "SubCluster " + subClusterId.toString() + " is not valid"; - FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + FederationStateStoreUtils.logAndThrowStoreException(e, LOG, + "SubCluster %s is not valid.", subClusterId); } // Filter the inactive @@ -575,68 +546,61 @@ public AddApplicationHomeSubClusterResponse addApplicationHomeSubCluster( cstmt = getCallableStatement(CALL_SP_ADD_APPLICATION_HOME_SUBCLUSTER); // Set the parameters for the stored procedure - cstmt.setString(1, appId.toString()); - cstmt.setString(2, subClusterId.getId()); - cstmt.registerOutParameter(3, java.sql.Types.VARCHAR); - cstmt.registerOutParameter(4, java.sql.Types.INTEGER); + cstmt.setString("applicationId_IN", appId.toString()); + cstmt.setString("homeSubCluster_IN", subClusterId.getId()); + cstmt.registerOutParameter("storedHomeSubCluster_OUT", java.sql.Types.VARCHAR); + cstmt.registerOutParameter("rowCount_OUT", java.sql.Types.INTEGER); // Execute the query long startTime = clock.getTime(); cstmt.executeUpdate(); long stopTime = clock.getTime(); - subClusterHome = cstmt.getString(3); + subClusterHome = cstmt.getString("storedHomeSubCluster_OUT"); SubClusterId subClusterIdHome = SubClusterId.newInstance(subClusterHome); - FederationStateStoreClientMetrics - .succeededStateStoreCall(stopTime - startTime); + FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime); // For failover reason, we check the returned SubClusterId. // If it is equal to the subclusterId we sent, the call added the new // application into FederationStateStore. If the call returns a different // SubClusterId it means we already tried to insert this application but a // component (Router/StateStore/RM) failed during the submission. + int rowCount = cstmt.getInt("rowCount_OUT"); if (subClusterId.equals(subClusterIdHome)) { // Check the ROWCOUNT value, if it is equal to 0 it means the call // did not add a new application into FederationStateStore - if (cstmt.getInt(4) == 0) { - LOG.info( - "The application {} was not inserted in the StateStore because it" - + " was already present in SubCluster {}", - appId, subClusterHome); - } else if (cstmt.getInt(4) != 1) { + if (rowCount == 0) { + LOG.info("The application {} was not inserted in the StateStore because it" + + " was already present in SubCluster {}", appId, subClusterHome); + } else if (cstmt.getInt("rowCount_OUT") != 1) { // Check the ROWCOUNT value, if it is different from 1 it means the // call had a wrong behavior. Maybe the database is not set correctly. - String errMsg = "Wrong behavior during the insertion of SubCluster " - + subClusterId; - FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + FederationStateStoreUtils.logAndThrowStoreException(LOG, + "Wrong behavior during the insertion of SubCluster %s.", subClusterId); } - LOG.info("Insert into the StateStore the application: " + appId - + " in SubCluster: " + subClusterHome); + LOG.info("Insert into the StateStore the application: {} in SubCluster: {}.", + appId, subClusterHome); } else { // Check the ROWCOUNT value, if it is different from 0 it means the call // did edited the table - if (cstmt.getInt(4) != 0) { - String errMsg = - "The application " + appId + " does exist but was overwritten"; - FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + if (rowCount != 0) { + FederationStateStoreUtils.logAndThrowStoreException(LOG, + "The application %s does exist but was overwritten.", appId); } - LOG.info("Application: " + appId + " already present with SubCluster: " - + subClusterHome); + LOG.info("Application: {} already present with SubCluster: {}.", appId, subClusterHome); } } catch (SQLException e) { FederationStateStoreClientMetrics.failedStateStoreCall(); - FederationStateStoreUtils - .logAndThrowRetriableException(LOG, - "Unable to insert the newly generated application " - + request.getApplicationHomeSubCluster().getApplicationId(), - e); + FederationStateStoreUtils.logAndThrowRetriableException(e, LOG, + "Unable to insert the newly generated application %s.", appId); } finally { // Return to the pool the CallableStatement FederationStateStoreUtils.returnToPool(LOG, cstmt); } + return AddApplicationHomeSubClusterResponse .newInstance(SubClusterId.newInstance(subClusterHome)); } @@ -659,9 +623,9 @@ public UpdateApplicationHomeSubClusterResponse updateApplicationHomeSubCluster( cstmt = getCallableStatement(CALL_SP_UPDATE_APPLICATION_HOME_SUBCLUSTER); // Set the parameters for the stored procedure - cstmt.setString(1, appId.toString()); - cstmt.setString(2, subClusterId.getId()); - cstmt.registerOutParameter(3, java.sql.Types.INTEGER); + cstmt.setString("applicationId_IN", appId.toString()); + cstmt.setString("homeSubCluster_IN", subClusterId.getId()); + cstmt.registerOutParameter("rowCount_OUT", java.sql.Types.INTEGER); // Execute the query long startTime = clock.getTime(); @@ -670,31 +634,25 @@ public UpdateApplicationHomeSubClusterResponse updateApplicationHomeSubCluster( // Check the ROWCOUNT value, if it is equal to 0 it means the call // did not update the application into FederationStateStore - if (cstmt.getInt(3) == 0) { - String errMsg = "Application " + appId + " does not exist"; - FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + int rowCount = cstmt.getInt("rowCount_OUT"); + if (rowCount == 0) { + FederationStateStoreUtils.logAndThrowStoreException(LOG, + "Application %s does not exist.", appId); } // Check the ROWCOUNT value, if it is different from 1 it means the call // had a wrong behavior. Maybe the database is not set correctly. - if (cstmt.getInt(3) != 1) { - String errMsg = - "Wrong behavior during the update of SubCluster " + subClusterId; - FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + if (cstmt.getInt("rowCount_OUT") != 1) { + FederationStateStoreUtils.logAndThrowStoreException(LOG, + "Wrong behavior during the update of SubCluster %s.", subClusterId); } - LOG.info( - "Update the SubCluster to {} for application {} in the StateStore", + LOG.info("Update the SubCluster to {} for application {} in the StateStore", subClusterId, appId); - FederationStateStoreClientMetrics - .succeededStateStoreCall(stopTime - startTime); - + FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime); } catch (SQLException e) { FederationStateStoreClientMetrics.failedStateStoreCall(); - FederationStateStoreUtils - .logAndThrowRetriableException(LOG, - "Unable to update the application " - + request.getApplicationHomeSubCluster().getApplicationId(), - e); + FederationStateStoreUtils.logAndThrowRetriableException(e, LOG, + "Unable to update the application %s.", appId); } finally { // Return to the pool the CallableStatement FederationStateStoreUtils.returnToPool(LOG, cstmt); @@ -712,44 +670,43 @@ public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster( SubClusterId homeRM = null; + ApplicationId applicationId = request.getApplicationId(); + try { cstmt = getCallableStatement(CALL_SP_GET_APPLICATION_HOME_SUBCLUSTER); // Set the parameters for the stored procedure - cstmt.setString(1, request.getApplicationId().toString()); - cstmt.registerOutParameter(2, java.sql.Types.VARCHAR); + cstmt.setString("applicationId_IN", applicationId.toString()); + cstmt.registerOutParameter("homeSubCluster_OUT", java.sql.Types.VARCHAR); // Execute the query long startTime = clock.getTime(); cstmt.execute(); long stopTime = clock.getTime(); - if (cstmt.getString(2) != null) { - homeRM = SubClusterId.newInstance(cstmt.getString(2)); + String homeSubCluster = cstmt.getString("homeSubCluster_OUT"); + if (homeSubCluster != null) { + homeRM = SubClusterId.newInstance(homeSubCluster); } else { - String errMsg = - "Application " + request.getApplicationId() + " does not exist"; - FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + FederationStateStoreUtils.logAndThrowStoreException(LOG, + "Application %s does not exist.", applicationId); } LOG.debug("Got the information about the specified application {}." - + " The AM is running in {}", request.getApplicationId(), homeRM); + + " The AM is running in {}", applicationId, homeRM); - FederationStateStoreClientMetrics - .succeededStateStoreCall(stopTime - startTime); + FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime); } catch (SQLException e) { FederationStateStoreClientMetrics.failedStateStoreCall(); - FederationStateStoreUtils.logAndThrowRetriableException(LOG, - "Unable to obtain the application information " - + "for the specified application " + request.getApplicationId(), - e); + FederationStateStoreUtils.logAndThrowRetriableException(e, LOG, + "Unable to obtain the application information for the specified application %s.", + applicationId); } finally { // Return to the pool the CallableStatement FederationStateStoreUtils.returnToPool(LOG, cstmt); } - return GetApplicationHomeSubClusterResponse - .newInstance(request.getApplicationId(), homeRM); + return GetApplicationHomeSubClusterResponse.newInstance(request.getApplicationId(), homeRM); } @Override @@ -790,8 +747,7 @@ public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster( SubClusterId.newInstance(homeSubCluster))); } - FederationStateStoreClientMetrics - .succeededStateStoreCall(stopTime - startTime); + FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime); } catch (SQLException e) { FederationStateStoreClientMetrics.failedStateStoreCall(); @@ -813,13 +769,13 @@ public DeleteApplicationHomeSubClusterResponse deleteApplicationHomeSubCluster( FederationApplicationHomeSubClusterStoreInputValidator.validate(request); CallableStatement cstmt = null; - + ApplicationId applicationId = request.getApplicationId(); try { cstmt = getCallableStatement(CALL_SP_DELETE_APPLICATION_HOME_SUBCLUSTER); // Set the parameters for the stored procedure - cstmt.setString(1, request.getApplicationId().toString()); - cstmt.registerOutParameter(2, java.sql.Types.INTEGER); + cstmt.setString("applicationId_IN", applicationId.toString()); + cstmt.registerOutParameter("rowCount_OUT", java.sql.Types.INTEGER); // Execute the query long startTime = clock.getTime(); @@ -828,28 +784,25 @@ public DeleteApplicationHomeSubClusterResponse deleteApplicationHomeSubCluster( // Check the ROWCOUNT value, if it is equal to 0 it means the call // did not delete the application from FederationStateStore - if (cstmt.getInt(2) == 0) { - String errMsg = - "Application " + request.getApplicationId() + " does not exist"; - FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + int rowCount = cstmt.getInt("rowCount_OUT"); + if (rowCount == 0) { + FederationStateStoreUtils.logAndThrowStoreException(LOG, + "Application %s does not exist.", applicationId); } // Check the ROWCOUNT value, if it is different from 1 it means the call // had a wrong behavior. Maybe the database is not set correctly. - if (cstmt.getInt(2) != 1) { - String errMsg = "Wrong behavior during deleting the application " - + request.getApplicationId(); - FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + if (cstmt.getInt("rowCount_OUT") != 1) { + FederationStateStoreUtils.logAndThrowStoreException(LOG, + "Wrong behavior during deleting the application %s.", applicationId); } - LOG.info("Delete from the StateStore the application: {}", - request.getApplicationId()); - FederationStateStoreClientMetrics - .succeededStateStoreCall(stopTime - startTime); + LOG.info("Delete from the StateStore the application: {}", request.getApplicationId()); + FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime); } catch (SQLException e) { FederationStateStoreClientMetrics.failedStateStoreCall(); - FederationStateStoreUtils.logAndThrowRetriableException(LOG, - "Unable to delete the application " + request.getApplicationId(), e); + FederationStateStoreUtils.logAndThrowRetriableException(e, LOG, + "Unable to delete the application %s.", applicationId); } finally { // Return to the pool the CallableStatement FederationStateStoreUtils.returnToPool(LOG, cstmt); @@ -871,9 +824,9 @@ public GetSubClusterPolicyConfigurationResponse getPolicyConfiguration( cstmt = getCallableStatement(CALL_SP_GET_POLICY_CONFIGURATION); // Set the parameters for the stored procedure - cstmt.setString(1, request.getQueue()); - cstmt.registerOutParameter(2, java.sql.Types.VARCHAR); - cstmt.registerOutParameter(3, java.sql.Types.VARBINARY); + cstmt.setString("queue_IN", request.getQueue()); + cstmt.registerOutParameter("policyType_OUT", java.sql.Types.VARCHAR); + cstmt.registerOutParameter("params_OUT", java.sql.Types.VARBINARY); // Execute the query long startTime = clock.getTime(); @@ -881,10 +834,11 @@ public GetSubClusterPolicyConfigurationResponse getPolicyConfiguration( long stopTime = clock.getTime(); // Check if the output it is a valid policy - if (cstmt.getString(2) != null && cstmt.getBytes(3) != null) { - subClusterPolicyConfiguration = - SubClusterPolicyConfiguration.newInstance(request.getQueue(), - cstmt.getString(2), ByteBuffer.wrap(cstmt.getBytes(3))); + String policyType = cstmt.getString("policyType_OUT"); + byte[] param = cstmt.getBytes("params_OUT"); + if (policyType != null && param != null) { + subClusterPolicyConfiguration = SubClusterPolicyConfiguration.newInstance( + request.getQueue(), policyType, ByteBuffer.wrap(param)); LOG.debug("Selected from StateStore the policy for the queue: {}", subClusterPolicyConfiguration); } else { @@ -892,20 +846,17 @@ public GetSubClusterPolicyConfigurationResponse getPolicyConfiguration( return null; } - FederationStateStoreClientMetrics - .succeededStateStoreCall(stopTime - startTime); + FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime); } catch (SQLException e) { FederationStateStoreClientMetrics.failedStateStoreCall(); - FederationStateStoreUtils.logAndThrowRetriableException(LOG, - "Unable to select the policy for the queue :" + request.getQueue(), - e); + FederationStateStoreUtils.logAndThrowRetriableException(e, LOG, + "Unable to select the policy for the queue : %s." + request.getQueue()); } finally { // Return to the pool the CallableStatement FederationStateStoreUtils.returnToPool(LOG, cstmt); } - return GetSubClusterPolicyConfigurationResponse - .newInstance(subClusterPolicyConfiguration); + return GetSubClusterPolicyConfigurationResponse.newInstance(subClusterPolicyConfiguration); } @Override @@ -923,10 +874,10 @@ public SetSubClusterPolicyConfigurationResponse setPolicyConfiguration( cstmt = getCallableStatement(CALL_SP_SET_POLICY_CONFIGURATION); // Set the parameters for the stored procedure - cstmt.setString(1, policyConf.getQueue()); - cstmt.setString(2, policyConf.getType()); - cstmt.setBytes(3, getByteArray(policyConf.getParams())); - cstmt.registerOutParameter(4, java.sql.Types.INTEGER); + cstmt.setString("queue_IN", policyConf.getQueue()); + cstmt.setString("policyType_IN", policyConf.getType()); + cstmt.setBytes("params_IN", getByteArray(policyConf.getParams())); + cstmt.registerOutParameter("rowCount_OUT", java.sql.Types.INTEGER); // Execute the query long startTime = clock.getTime(); @@ -935,30 +886,25 @@ public SetSubClusterPolicyConfigurationResponse setPolicyConfiguration( // Check the ROWCOUNT value, if it is equal to 0 it means the call // did not add a new policy into FederationStateStore - if (cstmt.getInt(4) == 0) { - String errMsg = "The policy " + policyConf.getQueue() - + " was not insert into the StateStore"; - FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + int rowCount = cstmt.getInt("rowCount_OUT"); + if (rowCount == 0) { + FederationStateStoreUtils.logAndThrowStoreException(LOG, + "The policy %s was not insert into the StateStore.", policyConf.getQueue()); } // Check the ROWCOUNT value, if it is different from 1 it means the call // had a wrong behavior. Maybe the database is not set correctly. - if (cstmt.getInt(4) != 1) { - String errMsg = - "Wrong behavior during insert the policy " + policyConf.getQueue(); - FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + if (rowCount != 1) { + FederationStateStoreUtils.logAndThrowStoreException(LOG, + "Wrong behavior during insert the policy %s.", policyConf.getQueue()); } - LOG.info("Insert into the state store the policy for the queue: " - + policyConf.getQueue()); - FederationStateStoreClientMetrics - .succeededStateStoreCall(stopTime - startTime); + LOG.info("Insert into the state store the policy for the queue: {}.", policyConf.getQueue()); + FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime); } catch (SQLException e) { FederationStateStoreClientMetrics.failedStateStoreCall(); - FederationStateStoreUtils.logAndThrowRetriableException(LOG, - "Unable to insert the newly generated policy for the queue :" - + policyConf.getQueue(), - e); + FederationStateStoreUtils.logAndThrowRetriableException(e, LOG, + "Unable to insert the newly generated policy for the queue : %s.", policyConf.getQueue()); } finally { // Return to the pool the CallableStatement FederationStateStoreUtils.returnToPool(LOG, cstmt); @@ -972,8 +918,7 @@ public GetSubClusterPoliciesConfigurationsResponse getPoliciesConfigurations( CallableStatement cstmt = null; ResultSet rs = null; - List policyConfigurations = - new ArrayList(); + List policyConfigurations = new ArrayList<>(); try { cstmt = getCallableStatement(CALL_SP_GET_POLICIES_CONFIGURATIONS); @@ -984,20 +929,17 @@ public GetSubClusterPoliciesConfigurationsResponse getPoliciesConfigurations( long stopTime = clock.getTime(); while (rs.next()) { - // Extract the output for each tuple - String queue = rs.getString(1); - String type = rs.getString(2); - byte[] policyInfo = rs.getBytes(3); + String queue = rs.getString("queue"); + String type = rs.getString("policyType"); + byte[] policyInfo = rs.getBytes("params"); SubClusterPolicyConfiguration subClusterPolicyConfiguration = - SubClusterPolicyConfiguration.newInstance(queue, type, - ByteBuffer.wrap(policyInfo)); + SubClusterPolicyConfiguration.newInstance(queue, type, ByteBuffer.wrap(policyInfo)); policyConfigurations.add(subClusterPolicyConfiguration); } - FederationStateStoreClientMetrics - .succeededStateStoreCall(stopTime - startTime); + FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime); } catch (SQLException e) { FederationStateStoreClientMetrics.failedStateStoreCall(); @@ -1008,8 +950,7 @@ public GetSubClusterPoliciesConfigurationsResponse getPoliciesConfigurations( FederationStateStoreUtils.returnToPool(LOG, cstmt, null, rs); } - return GetSubClusterPoliciesConfigurationsResponse - .newInstance(policyConfigurations); + return GetSubClusterPoliciesConfigurationsResponse.newInstance(policyConfigurations); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreUtils.java index 52ef725fb2..f14867a0e6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreUtils.java @@ -162,6 +162,29 @@ public static void logAndThrowStoreException(Logger log, String errMsgFormat, Ob throw new FederationStateStoreException(errMsg); } + + /** + * Throws an FederationStateStoreException due to an error in + * FederationStateStore. + * + * @param t the throwable raised in the called class. + * @param log the logger interface. + * @param errMsgFormat the error message format string. + * @param args referenced by the format specifiers in the format string. + * @throws YarnException on failure + */ + public static void logAndThrowStoreException( + Throwable t, Logger log, String errMsgFormat, Object... args) throws YarnException { + String errMsg = String.format(errMsgFormat, args); + if (t != null) { + log.error(errMsg, t); + throw new FederationStateStoreException(errMsg, t); + } else { + log.error(errMsg); + throw new FederationStateStoreException(errMsg); + } + } + /** * Throws an FederationStateStoreInvalidInputException due to an * error in FederationStateStore. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/HSQLDBFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/HSQLDBFederationStateStore.java index e90f1dc099..b3bb0764df 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/HSQLDBFederationStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/HSQLDBFederationStateStore.java @@ -325,7 +325,7 @@ public void init(Configuration conf) { try { conf.setInt(YarnConfiguration.FEDERATION_STATESTORE_MAX_APPLICATIONS, 10); super.init(conf); - conn = super.conn; + conn = super.getConn(); LOG.info("Database Init: Start"); @@ -365,7 +365,7 @@ public void init(Configuration conf) { public void initConnection(Configuration conf) { try { super.init(conf); - conn = super.conn; + conn = super.getConn(); } catch (YarnException e1) { LOG.error("ERROR: failed open connection to HSQLDB DB {}.", e1.getMessage()); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestSQLFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestSQLFederationStateStore.java index 4d405a1cd6..cddcf29ffb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestSQLFederationStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestSQLFederationStateStore.java @@ -447,7 +447,7 @@ public void testAddReservationHomeSubClusterAbnormalSituation() throws Exception SQLFederationStateStore sqlFederationStateStore = (SQLFederationStateStore) stateStore; - Connection conn = sqlFederationStateStore.conn; + Connection conn = sqlFederationStateStore.getConn(); conn.prepareStatement(SP_DROP_ADDRESERVATIONHOMESUBCLUSTER).execute(); conn.prepareStatement(SP_ADDRESERVATIONHOMESUBCLUSTER2).execute(); @@ -484,7 +484,7 @@ public void testUpdateReservationHomeSubClusterAbnormalSituation() throws Except SQLFederationStateStore sqlFederationStateStore = (SQLFederationStateStore) stateStore; - Connection conn = sqlFederationStateStore.conn; + Connection conn = sqlFederationStateStore.getConn(); conn.prepareStatement(SP_DROP_UPDATERESERVATIONHOMESUBCLUSTER).execute(); conn.prepareStatement(SP_UPDATERESERVATIONHOMESUBCLUSTER2).execute(); @@ -530,7 +530,7 @@ public void testDeleteReservationHomeSubClusterAbnormalSituation() throws Except SQLFederationStateStore sqlFederationStateStore = (SQLFederationStateStore) stateStore; - Connection conn = sqlFederationStateStore.conn; + Connection conn = sqlFederationStateStore.getConn(); conn.prepareStatement(SP_DROP_DELETERESERVATIONHOMESUBCLUSTER).execute(); conn.prepareStatement(SP_DELETERESERVATIONHOMESUBCLUSTER2).execute();