From c3706597a3f2105da83e25ed2681ffbadb463262 Mon Sep 17 00:00:00 2001 From: slfan1989 <55643692+slfan1989@users.noreply.github.com> Date: Thu, 16 Feb 2023 06:38:41 +0800 Subject: [PATCH] YARN-11349. [Federation] Router Support DelegationToken With SQL. (#5244) --- .../AbstractDelegationTokenSecretManager.java | 4 +- .../MySQL/FederationStateStoreStoredProcs.sql | 73 ++- .../MySQL/FederationStateStoreTables.sql | 21 + .../MySQL/dropStoreProcedures.sql | 14 + .../FederationStateStore/MySQL/dropTables.sql | 6 + .../FederationStateStoreStoredProcs.sql | 271 ++++++++- .../SQLServer/FederationStateStoreTables.sql | 154 +++++ .../SQLServer/FederationStateStoreUser.sql | 4 +- .../SQLServer/dropStoreProcedures.sql | 82 ++- .../SQLServer/dropTables.sql | 28 +- .../SQLServer/dropUser.sql | 8 +- .../impl/MemoryFederationStateStore.java | 25 +- .../store/impl/SQLFederationStateStore.java | 557 ++++++++++++++++-- .../records/RouterRMDTSecretManagerState.java | 4 +- .../store/records/RouterStoreToken.java | 25 +- .../impl/pb/RouterStoreTokenPBImpl.java | 28 + .../federation/store/sql/DatabaseProduct.java | 126 ++++ .../store/sql/FederationQueryRunner.java | 310 ++++++++++ .../store/sql/FederationSQLOutParameter.java | 91 +++ .../store/sql/ResultSetHandler.java | 30 + .../store/sql/RouterMasterKeyHandler.java | 68 +++ .../store/sql/RouterStoreTokenHandler.java | 83 +++ .../federation/store/sql/RowCountHandler.java | 56 ++ .../federation/store/sql/package-info.java | 17 + ...FederationRouterRMTokenInputValidator.java | 1 - .../utils/FederationStateStoreUtils.java | 69 ++- .../utils/FederationStateStoreFacade.java | 36 ++ .../proto/yarn_server_federation_protos.proto | 1 + .../impl/FederationStateStoreBaseTest.java | 33 +- .../impl/HSQLDBFederationStateStore.java | 125 ++++ .../impl/TestMemoryFederationStateStore.java | 2 +- .../impl/TestSQLFederationStateStore.java | 281 ++++----- .../RouterDelegationTokenSupport.java | 65 ++ .../token/delegation/package-info.java | 20 + .../RouterDelegationTokenSecretManager.java | 84 +++ .../TestFederationClientInterceptor.java | 18 +- 36 files changed, 2586 insertions(+), 234 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/sql/DatabaseProduct.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/sql/FederationQueryRunner.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/sql/FederationSQLOutParameter.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/sql/ResultSetHandler.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/sql/RouterMasterKeyHandler.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/sql/RouterStoreTokenHandler.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/sql/RowCountHandler.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/sql/package-info.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/security/token/delegation/RouterDelegationTokenSupport.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/security/token/delegation/package-info.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java index d0c0fac6e8..8aaf9bbd8d 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java @@ -866,9 +866,9 @@ private String getTokenRealOwner(TokenIdent id) { /** * Add token stats to the owner to token count mapping. * - * @param id + * @param id token id. */ - private void addTokenForOwnerStats(TokenIdent id) { + protected void addTokenForOwnerStats(TokenIdent id) { String realOwner = getTokenRealOwner(id); tokenOwnerStats.put(realOwner, tokenOwnerStats.getOrDefault(realOwner, 0L)+1); diff --git a/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/MySQL/FederationStateStoreStoredProcs.sql b/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/MySQL/FederationStateStoreStoredProcs.sql index 6461cf2bd7..2edda86cd3 100644 --- a/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/MySQL/FederationStateStoreStoredProcs.sql +++ b/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/MySQL/FederationStateStoreStoredProcs.sql @@ -183,7 +183,7 @@ BEGIN SELECT ROW_COUNT() INTO rowCount_OUT; SELECT homeSubCluster INTO storedHomeSubCluster_OUT FROM reservationsHomeSubCluster - WHERE applicationId = reservationId_IN; + WHERE reservationId = reservationId_IN; END // CREATE PROCEDURE sp_getReservationHomeSubCluster( @@ -219,4 +219,75 @@ BEGIN SELECT ROW_COUNT() INTO rowCount_OUT; END // +CREATE PROCEDURE sp_addMasterKey( + IN keyId_IN bigint, IN masterKey_IN varchar(1024), + OUT rowCount_OUT int) +BEGIN + INSERT INTO masterKeys(keyId, masterKey) + (SELECT keyId_IN, masterKey_IN + FROM masterKeys + WHERE keyId = keyId_IN + HAVING COUNT(*) = 0); + SELECT ROW_COUNT() INTO rowCount_OUT; +END // + +CREATE PROCEDURE sp_getMasterKey( + IN keyId_IN bigint, + OUT masterKey_OUT varchar(1024)) +BEGIN + SELECT masterKey INTO masterKey_OUT + FROM masterKeys + WHERE keyId = keyId_IN; +END // + +CREATE PROCEDURE sp_deleteMasterKey( + IN keyId_IN bigint, OUT rowCount_OUT int) +BEGIN + DELETE FROM masterKeys + WHERE keyId = keyId_IN; + SELECT ROW_COUNT() INTO rowCount_OUT; +END // + +CREATE PROCEDURE sp_addDelegationToken( + IN sequenceNum_IN bigint, IN tokenIdent_IN varchar(1024), + IN token_IN varchar(1024), IN renewDate_IN bigint, + OUT rowCount_OUT int) +BEGIN + INSERT INTO delegationTokens(sequenceNum, tokenIdent, token, renewDate) + (SELECT sequenceNum_IN, tokenIdent_IN, token_IN, renewDate_IN + FROM delegationTokens + WHERE sequenceNum = sequenceNum_IN + HAVING COUNT(*) = 0); + SELECT ROW_COUNT() INTO rowCount_OUT; +END // + +CREATE PROCEDURE sp_getDelegationToken( + IN sequenceNum_IN bigint, OUT tokenIdent_OUT varchar(1024), + OUT token_OUT varchar(1024), OUT renewDate_OUT bigint) +BEGIN + SELECT tokenIdent, token, renewDate INTO tokenIdent_OUT, token_OUT, renewDate_OUT + FROM delegationTokens + WHERE sequenceNum = sequenceNum_IN; +END // + +CREATE PROCEDURE sp_updateDelegationToken( + IN sequenceNum_IN bigint, IN tokenIdent_IN varchar(1024), + IN token_IN varchar(1024), IN renewDate_IN bigint, OUT rowCount_OUT int) +BEGIN + UPDATE delegationTokens + SET tokenIdent = tokenIdent_IN, + token = token_IN, + renewDate = renewDate_IN + WHERE sequenceNum = sequenceNum_IN; + SELECT ROW_COUNT() INTO rowCount_OUT; +END // + +CREATE PROCEDURE sp_deleteDelegationToken( + IN sequenceNum_IN bigint, OUT rowCount_OUT int) +BEGIN + DELETE FROM delegationTokens + WHERE sequenceNum = sequenceNum_IN; + SELECT ROW_COUNT() INTO rowCount_OUT; +END // + DELIMITER ; diff --git a/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/MySQL/FederationStateStoreTables.sql b/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/MySQL/FederationStateStoreTables.sql index d61a10f998..9e864eb5ed 100644 --- a/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/MySQL/FederationStateStoreTables.sql +++ b/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/MySQL/FederationStateStoreTables.sql @@ -52,4 +52,25 @@ CREATE TABLE reservationsHomeSubCluster ( reservationId varchar(128) NOT NULL, homeSubCluster varchar(256) NOT NULL, CONSTRAINT pk_reservationId PRIMARY KEY (reservationId) +); + +CREATE TABLE masterKeys ( + keyId bigint NOT NULL, + masterKey varchar(1024) NOT NULL, + CONSTRAINT pk_keyId PRIMARY KEY (keyId) +); + +CREATE TABLE delegationTokens +( + sequenceNum bigint NOT NULL, + tokenIdent varchar(1024) NOT NULL, + token varchar(1024) NOT NULL, + renewDate bigint NOT NULL, + CONSTRAINT pk_sequenceNum PRIMARY KEY (sequenceNum) +); + +CREATE TABLE sequenceTable ( + sequenceName varchar(255) NOT NULL, + nextVal bigint(20) NOT NULL, + CONSTRAINT pk_sequenceName PRIMARY KEY (sequenceName) ); \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/MySQL/dropStoreProcedures.sql b/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/MySQL/dropStoreProcedures.sql index a2f0b882b3..e7a16c81de 100644 --- a/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/MySQL/dropStoreProcedures.sql +++ b/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/MySQL/dropStoreProcedures.sql @@ -55,3 +55,17 @@ DROP PROCEDURE sp_getReservationsHomeSubCluster; DROP PROCEDURE sp_deleteReservationHomeSubCluster; DROP PROCEDURE sp_updateReservationHomeSubCluster; + +DROP PROCEDURE sp_addMasterKey; + +DROP PROCEDURE sp_getMasterKey; + +DROP PROCEDURE sp_deleteMasterKey; + +DROP PROCEDURE sp_addDelegationToken; + +DROP PROCEDURE sp_getDelegationToken; + +DROP PROCEDURE sp_updateDelegationToken; + +DROP PROCEDURE sp_deleteDelegationToken; \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/MySQL/dropTables.sql b/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/MySQL/dropTables.sql index d29f8652c1..38d00d3cb1 100644 --- a/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/MySQL/dropTables.sql +++ b/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/MySQL/dropTables.sql @@ -27,3 +27,9 @@ DROP TABLE membership; DROP TABLE policies; DROP TABLE reservationsHomeSubCluster; + +DROP TABLE masterKeys; + +DROP TABLE delegationTokens; + +DROP TABLE sequenceTable; diff --git a/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/SQLServer/FederationStateStoreStoredProcs.sql b/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/SQLServer/FederationStateStoreStoredProcs.sql index cc8a79d627..d82b53e73a 100644 --- a/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/SQLServer/FederationStateStoreStoredProcs.sql +++ b/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/SQLServer/FederationStateStoreStoredProcs.sql @@ -128,7 +128,7 @@ AS BEGIN [createTime], row_number() over(order by [createTime] desc) AS app_rank FROM [dbo].[applicationsHomeSubCluster] - WHERE [homeSubCluster] = @homeSubCluster_IN OR @homeSubCluster = '') AS applicationsHomeSubCluster + WHERE [homeSubCluster] = @homeSubCluster_IN OR @homeSubCluster_IN = '') AS applicationsHomeSubCluster WHERE app_rank <= @limit_IN; END TRY @@ -699,4 +699,273 @@ AS BEGIN ) WITH log END CATCH END; +GO + +IF OBJECT_ID ( '[sp_addMasterKey]', 'P' ) IS NOT NULL + DROP PROCEDURE [sp_addMasterKey]; +GO + +CREATE PROCEDURE [dbo].[sp_addMasterKey] + @keyId_IN BIGINT, + @masterKey_IN VARCHAR(1024), + @rowCount_OUT int OUTPUT +AS BEGIN + DECLARE @errorMessage nvarchar(4000) + + BEGIN TRY + BEGIN TRAN + -- If application to sub-cluster map doesn't exist, insert it. + -- Otherwise don't change the current mapping. + IF NOT EXISTS (SELECT TOP 1 * + FROM [dbo].[masterKeys] + WHERE [keyId] = @keyId_IN) + + INSERT INTO [dbo].[masterKeys] ( + [keyId], + [masterKey]) + VALUES ( + @keyId_IN, + @masterKey_IN); + -- End of the IF block + + SELECT @rowCount_OUT = @@ROWCOUNT; + + COMMIT TRAN + END TRY + + BEGIN CATCH + ROLLBACK TRAN + + SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE()) + + /* raise error and terminate the execution */ + RAISERROR(@errorMessage, --- Error Message + 1, -- Severity + -1 -- State + ) WITH log + END CATCH +END; +GO + +IF OBJECT_ID ( '[sp_getMasterKey]', 'P' ) IS NOT NULL + DROP PROCEDURE [sp_getMasterKey]; +GO + +CREATE PROCEDURE [dbo].[sp_getMasterKey] + @keyId_IN bigint, + @masterKey_OUT VARCHAR(1024) OUTPUT +AS BEGIN + DECLARE @errorMessage nvarchar(4000) + + BEGIN TRY + + SELECT @masterKey_OUT = [masterKey] + FROM [dbo].[masterKeys] + WHERE [keyId] = @keyId_IN; + + END TRY + + BEGIN CATCH + + SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE()) + + /* raise error and terminate the execution */ + RAISERROR(@errorMessage, --- Error Message + 1, -- Severity + -1 -- State + ) WITH log + END CATCH +END; +GO + +IF OBJECT_ID ( '[sp_deleteMasterKey]', 'P' ) IS NOT NULL + DROP PROCEDURE [sp_deleteMasterKey]; +GO + +CREATE PROCEDURE [dbo].[sp_deleteMasterKey] + @keyId_IN bigint, + @rowCount_OUT int OUTPUT +AS BEGIN + DECLARE @errorMessage nvarchar(4000) + + BEGIN TRY + BEGIN TRAN + + DELETE FROM [dbo].[masterKeys] + WHERE [keyId] = @keyId_IN; + SELECT @rowCount_OUT = @@ROWCOUNT; + + COMMIT TRAN + END TRY + + BEGIN CATCH + ROLLBACK TRAN + + SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE()) + + /* raise error and terminate the execution */ + RAISERROR(@errorMessage, --- Error Message + 1, -- Severity + -1 -- State + ) WITH log + END CATCH +END; +GO + +IF OBJECT_ID ( '[sp_addDelegationToken]', 'P' ) IS NOT NULL + DROP PROCEDURE [sp_addDelegationToken]; +GO + +CREATE PROCEDURE [dbo].[sp_addDelegationToken] + @sequenceNum_IN BIGINT, + @tokenIdent_IN VARCHAR(1024), + @token_IN VARCHAR(1024), + @renewDate_IN BIGINT, + @rowCount_OUT int OUTPUT +AS BEGIN + DECLARE @errorMessage nvarchar(4000) + + BEGIN TRY + BEGIN TRAN + -- If application to sub-cluster map doesn't exist, insert it. + -- Otherwise don't change the current mapping. + IF NOT EXISTS (SELECT TOP 1 * + FROM [dbo].[delegationTokens] + WHERE [sequenceNum] = @sequenceNum_IN) + + INSERT INTO [dbo].[delegationTokens] ( + [sequenceNum], + [tokenIdent], + [token], + [renewDate]) + VALUES ( + @sequenceNum_IN, + @tokenIdent_IN, + @token_IN, + @renewDate_IN); + -- End of the IF block + + SELECT @rowCount_OUT = @@ROWCOUNT; + + COMMIT TRAN + END TRY + + BEGIN CATCH + ROLLBACK TRAN + + SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE()) + + /* raise error and terminate the execution */ + RAISERROR(@errorMessage, --- Error Message + 1, -- Severity + -1 -- State + ) WITH log + END CATCH +END; +GO + +IF OBJECT_ID ( '[sp_getDelegationToken]', 'P' ) IS NOT NULL + DROP PROCEDURE [sp_getDelegationToken]; +GO + +CREATE PROCEDURE [dbo].[sp_getDelegationToken] + @sequenceNum_IN BIGINT, + @tokenIdent_OUT VARCHAR(1024) OUTPUT, + @token_OUT VARCHAR(1024) OUTPUT, + @renewDate_OUT BIGINT OUTPUT +AS BEGIN + DECLARE @errorMessage nvarchar(4000) + + BEGIN TRY + + SELECT @tokenIdent_OUT = [tokenIdent], + @token_OUT = [token], + @renewDate_OUT = [renewDate] + FROM [dbo].[delegationTokens] + WHERE [sequenceNum] = @sequenceNum_IN; + + END TRY + + BEGIN CATCH + + SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE()) + + /* raise error and terminate the execution */ + RAISERROR(@errorMessage, --- Error Message + 1, -- Severity + -1 -- State + ) WITH log + END CATCH +END; +GO + +IF OBJECT_ID ( '[sp_updateDelegationToken]', 'P' ) IS NOT NULL + DROP PROCEDURE [sp_updateDelegationToken]; +GO + +CREATE PROCEDURE [dbo].[sp_updateDelegationToken] + @sequenceNum_IN BIGINT, + @tokenIdent_IN VARCHAR(1024), + @token_IN VARCHAR(1024), + @renewDate_IN BIGINT, + @rowCount_OUT BIGINT OUTPUT +AS BEGIN + DECLARE @errorMessage nvarchar(4000) + + BEGIN TRY + + UPDATE [dbo].[delegationTokens] + SET [tokenIdent] = @tokenIdent_IN, + [token] = @token_IN, + [renewDate] = @renewDate_IN + WHERE [sequenceNum] = @sequenceNum_IN; + SELECT @rowCount_OUT = @@ROWCOUNT; + + END TRY + + BEGIN CATCH + + SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE()) + + /* raise error and terminate the execution */ + RAISERROR(@errorMessage, --- Error Message + 1, -- Severity + -1 -- State + ) WITH log + END CATCH +END; +GO + +IF OBJECT_ID ( '[sp_deleteDelegationToken]', 'P' ) IS NOT NULL + DROP PROCEDURE [sp_deleteDelegationToken]; +GO + +CREATE PROCEDURE [dbo].[sp_deleteDelegationToken] + @sequenceNum_IN bigint, + @rowCount_OUT int OUTPUT +AS BEGIN + DECLARE @errorMessage nvarchar(4000) + + BEGIN TRY + BEGIN TRAN + + DELETE FROM [dbo].[delegationTokens] + WHERE [sequenceNum] = @sequenceNum_IN; + SELECT @rowCount_OUT = @@ROWCOUNT; + + COMMIT TRAN + END TRY + + BEGIN CATCH + ROLLBACK TRAN + + SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE()) + + /* raise error and terminate the execution */ + RAISERROR(@errorMessage, --- Error Message + 1, -- Severity + -1 -- State + ) WITH log + END CATCH +END; GO \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/SQLServer/FederationStateStoreTables.sql b/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/SQLServer/FederationStateStoreTables.sql index fb8a1bff55..4d187d4459 100644 --- a/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/SQLServer/FederationStateStoreTables.sql +++ b/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/SQLServer/FederationStateStoreTables.sql @@ -155,4 +155,158 @@ IF NOT EXISTS ( SELECT * FROM [FederationStateStore].sys.tables ELSE PRINT 'Table reservationsHomeSubCluster exists, no operation required...' GO +GO + +IF NOT EXISTS ( SELECT * FROM [FederationStateStore].sys.tables + WHERE name = 'masterKeys' + AND schema_id = SCHEMA_ID('dbo')) + BEGIN + PRINT 'Table masterKeys does not exist, create it...' + + SET ANSI_NULLS ON + + SET QUOTED_IDENTIFIER ON + + SET ANSI_PADDING ON + + CREATE TABLE [dbo].[masterKeys]( + keyId BIGINT NOT NULL, + masterKey VARCHAR(1024) NOT NULL, + CONSTRAINT [pk_keyId] PRIMARY KEY + ( + [keyId] + ) + ) + + SET ANSI_PADDING OFF + + PRINT 'Table masterKeys created.' + END +ELSE + PRINT 'Table masterKeys exists, no operation required...' + GO +GO + +IF NOT EXISTS ( SELECT * FROM [FederationStateStore].sys.tables + WHERE name = 'delegationTokens' + AND schema_id = SCHEMA_ID('dbo')) + BEGIN + PRINT 'Table delegationTokens does not exist, create it...' + + SET ANSI_NULLS ON + + SET QUOTED_IDENTIFIER ON + + SET ANSI_PADDING ON + + CREATE TABLE [dbo].[delegationTokens]( + sequenceNum BIGINT NOT NULL, + tokenIdent VARCHAR(1024) NOT NULL, + token VARCHAR(1024) NOT NULL, + renewDate BIGINT NOT NULL, + CONSTRAINT [pk_sequenceNum] PRIMARY KEY + ( + [sequenceNum] + ) + ) + + SET ANSI_PADDING OFF + + PRINT 'Table delegationTokens created.' + END +ELSE + PRINT 'Table delegationTokens exists, no operation required...' + GO +GO + +IF NOT EXISTS ( SELECT * FROM [FederationStateStore].sys.tables + WHERE name = 'masterKeys' + AND schema_id = SCHEMA_ID('dbo')) + BEGIN + PRINT 'Table masterKeys does not exist, create it...' + + SET ANSI_NULLS ON + + SET QUOTED_IDENTIFIER ON + + SET ANSI_PADDING ON + + CREATE TABLE [dbo].[masterKeys]( + keyId BIGINT NOT NULL, + masterKey VARCHAR(1024) NOT NULL, + CONSTRAINT [pk_keyId] PRIMARY KEY + ( + [keyId] + ) + ) + + SET ANSI_PADDING OFF + + PRINT 'Table masterKeys created.' + END +ELSE + PRINT 'Table masterKeys exists, no operation required...' + GO +GO + +IF NOT EXISTS ( SELECT * FROM [FederationStateStore].sys.tables + WHERE name = 'delegationTokens' + AND schema_id = SCHEMA_ID('dbo')) + BEGIN + PRINT 'Table delegationTokens does not exist, create it...' + + SET ANSI_NULLS ON + + SET QUOTED_IDENTIFIER ON + + SET ANSI_PADDING ON + + CREATE TABLE [dbo].[delegationTokens]( + sequenceNum BIGINT NOT NULL, + tokenIdent VARCHAR(1024) NOT NULL, + token VARCHAR(1024) NOT NULL, + renewDate BIGINT NOT NULL, + CONSTRAINT [pk_sequenceNum] PRIMARY KEY + ( + [sequenceNum] + ) + ) + + SET ANSI_PADDING OFF + + PRINT 'Table delegationTokens created.' + END +ELSE + PRINT 'Table delegationTokens exists, no operation required...' + GO +GO + +IF NOT EXISTS ( SELECT * FROM [FederationStateStore].sys.tables + WHERE name = 'sequenceTable' + AND schema_id = SCHEMA_ID('dbo')) + BEGIN + PRINT 'Table sequenceTable does not exist, create it...' + + SET ANSI_NULLS ON + + SET QUOTED_IDENTIFIER ON + + SET ANSI_PADDING ON + + CREATE TABLE [dbo].[sequenceTable]( + sequenceName VARCHAR(255) NOT NULL, + nextVal bigint NOT NULL + CONSTRAINT [pk_sequenceName] PRIMARY KEY + ( + [sequenceName] + ) + ) + + SET ANSI_PADDING OFF + + PRINT 'Table sequenceTable created.' + END +ELSE + PRINT 'Table sequenceTable exists, no operation required...' + GO GO \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/SQLServer/FederationStateStoreUser.sql b/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/SQLServer/FederationStateStoreUser.sql index 3f9553fbe3..1bee729f84 100644 --- a/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/SQLServer/FederationStateStoreUser.sql +++ b/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/SQLServer/FederationStateStoreUser.sql @@ -21,10 +21,10 @@ USE [FederationStateStore] GO -CREATE LOGIN 'FederationUser' with password = 'FederationPassword', default_database=[FederationStateStore] ; +CREATE LOGIN FederationUser with password = 'Federation@Password', default_database=[FederationStateStore]; GO -CREATE USER 'FederationUser' FOR LOGIN 'FederationUser' WITH default_schema=dbo; +CREATE USER FederationUser FOR LOGIN FederationUser WITH default_schema=dbo; GO EXEC sp_addrolemember 'db_owner', 'FederationUser'; diff --git a/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/SQLServer/dropStoreProcedures.sql b/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/SQLServer/dropStoreProcedures.sql index 6204df2f41..a6e35df1af 100644 --- a/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/SQLServer/dropStoreProcedures.sql +++ b/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/SQLServer/dropStoreProcedures.sql @@ -21,56 +21,102 @@ USE [FederationStateStore] GO -DROP PROCEDURE IF EXISTS [sp_addApplicationHomeSubCluster]; +IF OBJECT_ID ('[sp_addApplicationHomeSubCluster]', 'P') IS NOT NULL + DROP PROCEDURE [sp_addApplicationHomeSubCluster]; GO -DROP PROCEDURE IF EXISTS [sp_updateApplicationHomeSubCluster]; +IF OBJECT_ID ('[sp_updateApplicationHomeSubCluster]', 'P') IS NOT NULL + DROP PROCEDURE [sp_updateApplicationHomeSubCluster]; GO -DROP PROCEDURE IF EXISTS [sp_getApplicationsHomeSubCluster]; +IF OBJECT_ID ('[sp_getApplicationsHomeSubCluster]', 'P') IS NOT NULL + DROP PROCEDURE [sp_getApplicationsHomeSubCluster]; GO -DROP PROCEDURE IF EXISTS [sp_getApplicationHomeSubCluster]; +IF OBJECT_ID ('[sp_getApplicationHomeSubCluster]', 'P') IS NOT NULL + DROP PROCEDURE [sp_getApplicationHomeSubCluster]; GO -DROP PROCEDURE IF EXISTS [sp_deleteApplicationHomeSubCluster]; +IF OBJECT_ID ('[sp_deleteApplicationHomeSubCluster]', 'P') IS NOT NULL + DROP PROCEDURE [sp_deleteApplicationHomeSubCluster]; GO -DROP PROCEDURE IF EXISTS [sp_registerSubCluster]; +IF OBJECT_ID ('[sp_registerSubCluster]', 'P') IS NOT NULL + DROP PROCEDURE [sp_registerSubCluster]; GO -DROP PROCEDURE IF EXISTS [sp_getSubClusters]; +IF OBJECT_ID ('[sp_getSubClusters]', 'P') IS NOT NULL + DROP PROCEDURE [sp_getSubClusters]; GO -DROP PROCEDURE IF EXISTS [sp_getSubCluster]; +IF OBJECT_ID ('[sp_getSubCluster]', 'P') IS NOT NULL + DROP PROCEDURE [sp_getSubCluster]; GO -DROP PROCEDURE IF EXISTS [sp_subClusterHeartbeat]; +IF OBJECT_ID ('[sp_subClusterHeartbeat]', 'P') IS NOT NULL + DROP PROCEDURE [sp_subClusterHeartbeat]; GO -DROP PROCEDURE IF EXISTS [sp_deregisterSubCluster]; +IF OBJECT_ID ('[sp_deregisterSubCluster]', 'P') IS NOT NULL + DROP PROCEDURE [sp_deregisterSubCluster]; GO -DROP PROCEDURE IF EXISTS [sp_setPolicyConfiguration]; +IF OBJECT_ID ('[sp_setPolicyConfiguration]', 'P') IS NOT NULL + DROP PROCEDURE [sp_setPolicyConfiguration]; GO -DROP PROCEDURE IF EXISTS [sp_getPolicyConfiguration]; +IF OBJECT_ID ('[sp_getPolicyConfiguration]', 'P') IS NOT NULL + DROP PROCEDURE [sp_getPolicyConfiguration]; GO -DROP PROCEDURE IF EXISTS [sp_getPoliciesConfigurations]; +IF OBJECT_ID ('[sp_getPoliciesConfigurations]', 'P') IS NOT NULL + DROP PROCEDURE [sp_getPoliciesConfigurations]; GO -DROP PROCEDURE IF EXISTS [sp_addApplicationHomeSubCluster]; +IF OBJECT_ID ('[sp_addReservationHomeSubCluster]', 'P') IS NOT NULL + DROP PROCEDURE [sp_addReservationHomeSubCluster]; GO -DROP PROCEDURE IF EXISTS [sp_updateReservationHomeSubCluster]; +IF OBJECT_ID ('[sp_updateReservationHomeSubCluster]', 'P') IS NOT NULL + DROP PROCEDURE [sp_updateReservationHomeSubCluster]; GO -DROP PROCEDURE IF EXISTS [sp_getReservationsHomeSubCluster]; +IF OBJECT_ID ('[sp_getReservationsHomeSubCluster]', 'P') IS NOT NULL + DROP PROCEDURE [sp_getReservationsHomeSubCluster]; GO -DROP PROCEDURE IF EXISTS [sp_getReservationHomeSubCluster]; +IF OBJECT_ID ('[sp_getReservationHomeSubCluster]', 'P') IS NOT NULL + DROP PROCEDURE [sp_getReservationHomeSubCluster]; GO -DROP PROCEDURE IF EXISTS [sp_deleteReservationHomeSubCluster]; +IF OBJECT_ID ('[sp_deleteReservationHomeSubCluster]', 'P') IS NOT NULL + DROP PROCEDURE [sp_deleteReservationHomeSubCluster]; GO + +IF OBJECT_ID ('[sp_addMasterKey]', 'P') IS NOT NULL + DROP PROCEDURE [sp_addMasterKey]; +GO + +IF OBJECT_ID ('[sp_getMasterKey]', 'P') IS NOT NULL + DROP PROCEDURE [sp_getMasterKey]; +GO + +IF OBJECT_ID ('[sp_deleteMasterKey]', 'P') IS NOT NULL + DROP PROCEDURE [sp_deleteMasterKey]; +GO + +IF OBJECT_ID ('[sp_addDelegationToken]', 'P') IS NOT NULL + DROP PROCEDURE [sp_addDelegationToken]; +GO + +IF OBJECT_ID ('[sp_getDelegationToken]', 'P') IS NOT NULL + DROP PROCEDURE [sp_getDelegationToken]; +GO + +IF OBJECT_ID ('[sp_updateDelegationToken]', 'P') IS NOT NULL + DROP PROCEDURE [sp_updateDelegationToken]; +GO + +IF OBJECT_ID ('[sp_deleteDelegationToken]', 'P') IS NOT NULL + DROP PROCEDURE [sp_deleteDelegationToken]; +GO \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/SQLServer/dropTables.sql b/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/SQLServer/dropTables.sql index 9bcacb7f88..9a2188cbe1 100644 --- a/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/SQLServer/dropTables.sql +++ b/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/SQLServer/dropTables.sql @@ -21,14 +21,34 @@ USE [FederationStateStore] GO -DROP TABLE [applicationsHomeSubCluster]; +IF OBJECT_ID ( '[sp_deregisterSubCluster]', 'U' ) IS NOT NULL + DROP TABLE [sp_deregisterSubCluster]; GO -DROP TABLE [membership]; +IF OBJECT_ID ( '[membership]', 'U' ) IS NOT NULL + DROP TABLE [membership]; GO -DROP TABLE [policies]; +IF OBJECT_ID ( '[policies]', 'U' ) IS NOT NULL + DROP TABLE [policies]; GO -DROP TABLE [reservationsHomeSubCluster]; +IF OBJECT_ID ( '[applicationsHomeSubCluster]', 'U' ) IS NOT NULL + DROP TABLE [applicationsHomeSubCluster]; +GO + +IF OBJECT_ID ( '[reservationsHomeSubCluster]', 'U' ) IS NOT NULL + DROP TABLE [reservationsHomeSubCluster]; +GO + +IF OBJECT_ID ( '[masterKeys]', 'U' ) IS NOT NULL + DROP TABLE [masterKeys]; +GO + +IF OBJECT_ID ( '[delegationTokens]', 'U' ) IS NOT NULL + DROP TABLE [delegationTokens]; +GO + +IF OBJECT_ID ( '[sequenceTable]', 'U' ) IS NOT NULL + DROP TABLE [sequenceTable]; GO diff --git a/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/SQLServer/dropUser.sql b/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/SQLServer/dropUser.sql index 6d5203a52e..2d338606e7 100644 --- a/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/SQLServer/dropUser.sql +++ b/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/SQLServer/dropUser.sql @@ -18,5 +18,11 @@ -- Script to drop the user from Federation StateStore in MySQL -DROP USER 'FederationUser'; +USE [FederationStateStore] +GO + +DROP USER FederationUser; +GO + +DROP LOGIN FederationUser; GO \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java index 41ade680be..d44c30eef2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java @@ -494,8 +494,7 @@ public RouterRMTokenResponse storeNewToken(RouterRMTokenRequest request) RouterStoreToken storeToken = request.getRouterStoreToken(); RMDelegationTokenIdentifier tokenIdentifier = (RMDelegationTokenIdentifier) storeToken.getTokenIdentifier(); - Long renewDate = storeToken.getRenewDate(); - storeOrUpdateRouterRMDT(tokenIdentifier, renewDate, false); + storeOrUpdateRouterRMDT(tokenIdentifier, storeToken, false); return RouterRMTokenResponse.newInstance(storeToken); } @@ -505,10 +504,10 @@ public RouterRMTokenResponse updateStoredToken(RouterRMTokenRequest request) RouterStoreToken storeToken = request.getRouterStoreToken(); RMDelegationTokenIdentifier tokenIdentifier = (RMDelegationTokenIdentifier) storeToken.getTokenIdentifier(); - Long renewDate = storeToken.getRenewDate(); - Map rmDTState = routerRMSecretManagerState.getTokenState(); + Map rmDTState = + routerRMSecretManagerState.getTokenState(); rmDTState.remove(tokenIdentifier); - storeOrUpdateRouterRMDT(tokenIdentifier, renewDate, true); + storeOrUpdateRouterRMDT(tokenIdentifier, storeToken, true); return RouterRMTokenResponse.newInstance(storeToken); } @@ -518,7 +517,8 @@ public RouterRMTokenResponse removeStoredToken(RouterRMTokenRequest request) RouterStoreToken storeToken = request.getRouterStoreToken(); RMDelegationTokenIdentifier tokenIdentifier = (RMDelegationTokenIdentifier) storeToken.getTokenIdentifier(); - Map rmDTState = routerRMSecretManagerState.getTokenState(); + Map rmDTState = + routerRMSecretManagerState.getTokenState(); rmDTState.remove(tokenIdentifier); return RouterRMTokenResponse.newInstance(storeToken); } @@ -529,13 +529,13 @@ public RouterRMTokenResponse getTokenByRouterStoreToken(RouterRMTokenRequest req RouterStoreToken storeToken = request.getRouterStoreToken(); RMDelegationTokenIdentifier tokenIdentifier = (RMDelegationTokenIdentifier) storeToken.getTokenIdentifier(); - Map rmDTState = routerRMSecretManagerState.getTokenState(); + Map rmDTState = + routerRMSecretManagerState.getTokenState(); if (!rmDTState.containsKey(tokenIdentifier)) { LOG.info("Router RMDelegationToken: {} does not exist.", tokenIdentifier); throw new IOException("Router RMDelegationToken: " + tokenIdentifier + " does not exist."); } - RouterStoreToken resultToken = - RouterStoreToken.newInstance(tokenIdentifier, rmDTState.get(tokenIdentifier)); + RouterStoreToken resultToken = rmDTState.get(tokenIdentifier); return RouterRMTokenResponse.newInstance(resultToken); } @@ -565,13 +565,14 @@ public int incrementCurrentKeyId() { } private void storeOrUpdateRouterRMDT(RMDelegationTokenIdentifier rmDTIdentifier, - Long renewDate, boolean isUpdate) throws IOException { - Map rmDTState = routerRMSecretManagerState.getTokenState(); + RouterStoreToken routerStoreToken, boolean isUpdate) throws IOException { + Map rmDTState = + routerRMSecretManagerState.getTokenState(); if (rmDTState.containsKey(rmDTIdentifier)) { LOG.info("Error storing info for RMDelegationToken: {}.", rmDTIdentifier); throw new IOException("Router RMDelegationToken: " + rmDTIdentifier + "is already stored."); } - rmDTState.put(rmDTIdentifier, renewDate); + rmDTState.put(rmDTIdentifier, routerStoreToken); if (!isUpdate) { routerRMSecretManagerState.setDtSequenceNumber(rmDTIdentifier.getSequenceNumber()); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java index 1e3f3a12f3..f16fe673ce 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java @@ -33,10 +33,12 @@ import org.apache.commons.lang3.NotImplementedException; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.security.client.YARNDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreInvalidInputException; import org.apache.hadoop.yarn.server.federation.store.metrics.FederationStateStoreClientMetrics; @@ -86,11 +88,19 @@ import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyResponse; import org.apache.hadoop.yarn.server.federation.store.records.RouterRMTokenRequest; import org.apache.hadoop.yarn.server.federation.store.records.RouterRMTokenResponse; +import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKey; +import org.apache.hadoop.yarn.server.federation.store.records.RouterStoreToken; import org.apache.hadoop.yarn.server.federation.store.utils.FederationApplicationHomeSubClusterStoreInputValidator; import org.apache.hadoop.yarn.server.federation.store.utils.FederationMembershipStateStoreInputValidator; import org.apache.hadoop.yarn.server.federation.store.utils.FederationPolicyStoreInputValidator; -import org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreUtils; import org.apache.hadoop.yarn.server.federation.store.utils.FederationReservationHomeSubClusterStoreInputValidator; +import org.apache.hadoop.yarn.server.federation.store.utils.FederationRouterRMTokenInputValidator; +import org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreUtils; +import org.apache.hadoop.yarn.server.federation.store.sql.FederationSQLOutParameter; +import org.apache.hadoop.yarn.server.federation.store.sql.FederationQueryRunner; +import org.apache.hadoop.yarn.server.federation.store.sql.RouterMasterKeyHandler; +import org.apache.hadoop.yarn.server.federation.store.sql.RouterStoreTokenHandler; +import org.apache.hadoop.yarn.server.federation.store.sql.RowCountHandler; import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.MonotonicClock; @@ -100,6 +110,13 @@ import org.apache.hadoop.classification.VisibleForTesting; import com.zaxxer.hikari.HikariDataSource; +import static java.sql.Types.INTEGER; +import static java.sql.Types.VARCHAR; +import static java.sql.Types.BIGINT; +import static org.apache.hadoop.yarn.server.federation.store.sql.FederationQueryRunner.YARN_ROUTER_CURRENT_KEY_ID; +import static org.apache.hadoop.yarn.server.federation.store.sql.FederationQueryRunner.YARN_ROUTER_SEQUENCE_NUM; +import static org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreUtils.convertMasterKeyToDelegationKey; + /** * SQL implementation of {@link FederationStateStore}. */ @@ -164,6 +181,27 @@ public class SQLFederationStateStore implements FederationStateStore { protected static final String CALL_SP_UPDATE_RESERVATION_HOME_SUBCLUSTER = "{call sp_updateReservationHomeSubCluster(?, ?, ?)}"; + protected static final String CALL_SP_ADD_MASTERKEY = + "{call sp_addMasterKey(?, ?, ?)}"; + + protected static final String CALL_SP_GET_MASTERKEY = + "{call sp_getMasterKey(?, ?)}"; + + protected static final String CALL_SP_DELETE_MASTERKEY = + "{call sp_deleteMasterKey(?, ?)}"; + + protected static final String CALL_SP_ADD_DELEGATIONTOKEN = + "{call sp_addDelegationToken(?, ?, ?, ?, ?)}"; + + protected static final String CALL_SP_GET_DELEGATIONTOKEN = + "{call sp_getDelegationToken(?, ?, ?, ?)}"; + + protected static final String CALL_SP_UPDATE_DELEGATIONTOKEN = + "{call sp_updateDelegationToken(?, ?, ?, ?, ?)}"; + + protected static final String CALL_SP_DELETE_DELEGATIONTOKEN = + "{call sp_deleteDelegationToken(?, ?)}"; + private Calendar utcCalendar = Calendar.getInstance(TimeZone.getTimeZone("UTC")); @@ -247,7 +285,7 @@ public SubClusterRegisterResponse registerSubCluster( cstmt.setString("state_IN", subClusterInfo.getState().toString()); cstmt.setLong("lastStartTime_IN", subClusterInfo.getLastStartTime()); cstmt.setString("capability_IN", subClusterInfo.getCapability()); - cstmt.registerOutParameter("rowCount_OUT", java.sql.Types.INTEGER); + cstmt.registerOutParameter("rowCount_OUT", INTEGER); // Execute the query long startTime = clock.getTime(); @@ -302,7 +340,7 @@ public SubClusterDeregisterResponse deregisterSubCluster( // Set the parameters for the stored procedure cstmt.setString("subClusterId_IN", subClusterId.getId()); cstmt.setString("state_IN", state.toString()); - cstmt.registerOutParameter("rowCount_OUT", java.sql.Types.INTEGER); + cstmt.registerOutParameter("rowCount_OUT", INTEGER); // Execute the query long startTime = clock.getTime(); @@ -356,7 +394,7 @@ public SubClusterHeartbeatResponse subClusterHeartbeat( cstmt.setString("subClusterId_IN", subClusterId.getId()); cstmt.setString("state_IN", state.toString()); cstmt.setString("capability_IN", subClusterHeartbeatRequest.getCapability()); - cstmt.registerOutParameter("rowCount_OUT", java.sql.Types.INTEGER); + cstmt.registerOutParameter("rowCount_OUT", INTEGER); // Execute the query long startTime = clock.getTime(); @@ -408,14 +446,14 @@ public GetSubClusterInfoResponse getSubCluster( cstmt.setString("subClusterId_IN", subClusterId.getId()); // Set the parameters for the stored procedure - cstmt.registerOutParameter("amRMServiceAddress_OUT", java.sql.Types.VARCHAR); - cstmt.registerOutParameter("clientRMServiceAddress_OUT", java.sql.Types.VARCHAR); - cstmt.registerOutParameter("rmAdminServiceAddress_OUT", java.sql.Types.VARCHAR); - cstmt.registerOutParameter("rmWebServiceAddress_OUT", java.sql.Types.VARCHAR); + cstmt.registerOutParameter("amRMServiceAddress_OUT", VARCHAR); + cstmt.registerOutParameter("clientRMServiceAddress_OUT", VARCHAR); + cstmt.registerOutParameter("rmAdminServiceAddress_OUT", VARCHAR); + cstmt.registerOutParameter("rmWebServiceAddress_OUT", VARCHAR); cstmt.registerOutParameter("lastHeartBeat_OUT", java.sql.Types.TIMESTAMP); - cstmt.registerOutParameter("state_OUT", java.sql.Types.VARCHAR); - cstmt.registerOutParameter("lastStartTime_OUT", java.sql.Types.BIGINT); - cstmt.registerOutParameter("capability_OUT", java.sql.Types.VARCHAR); + cstmt.registerOutParameter("state_OUT", VARCHAR); + cstmt.registerOutParameter("lastStartTime_OUT", BIGINT); + cstmt.registerOutParameter("capability_OUT", VARCHAR); // Execute the query long startTime = clock.getTime(); @@ -548,8 +586,8 @@ public AddApplicationHomeSubClusterResponse addApplicationHomeSubCluster( // Set the parameters for the stored procedure cstmt.setString("applicationId_IN", appId.toString()); cstmt.setString("homeSubCluster_IN", subClusterId.getId()); - cstmt.registerOutParameter("storedHomeSubCluster_OUT", java.sql.Types.VARCHAR); - cstmt.registerOutParameter("rowCount_OUT", java.sql.Types.INTEGER); + cstmt.registerOutParameter("storedHomeSubCluster_OUT", VARCHAR); + cstmt.registerOutParameter("rowCount_OUT", INTEGER); // Execute the query long startTime = clock.getTime(); @@ -625,7 +663,7 @@ public UpdateApplicationHomeSubClusterResponse updateApplicationHomeSubCluster( // Set the parameters for the stored procedure cstmt.setString("applicationId_IN", appId.toString()); cstmt.setString("homeSubCluster_IN", subClusterId.getId()); - cstmt.registerOutParameter("rowCount_OUT", java.sql.Types.INTEGER); + cstmt.registerOutParameter("rowCount_OUT", INTEGER); // Execute the query long startTime = clock.getTime(); @@ -677,7 +715,7 @@ public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster( // Set the parameters for the stored procedure cstmt.setString("applicationId_IN", applicationId.toString()); - cstmt.registerOutParameter("homeSubCluster_OUT", java.sql.Types.VARCHAR); + cstmt.registerOutParameter("homeSubCluster_OUT", VARCHAR); // Execute the query long startTime = clock.getTime(); @@ -775,7 +813,7 @@ public DeleteApplicationHomeSubClusterResponse deleteApplicationHomeSubCluster( // Set the parameters for the stored procedure cstmt.setString("applicationId_IN", applicationId.toString()); - cstmt.registerOutParameter("rowCount_OUT", java.sql.Types.INTEGER); + cstmt.registerOutParameter("rowCount_OUT", INTEGER); // Execute the query long startTime = clock.getTime(); @@ -825,7 +863,7 @@ public GetSubClusterPolicyConfigurationResponse getPolicyConfiguration( // Set the parameters for the stored procedure cstmt.setString("queue_IN", request.getQueue()); - cstmt.registerOutParameter("policyType_OUT", java.sql.Types.VARCHAR); + cstmt.registerOutParameter("policyType_OUT", VARCHAR); cstmt.registerOutParameter("params_OUT", java.sql.Types.VARBINARY); // Execute the query @@ -877,7 +915,7 @@ public SetSubClusterPolicyConfigurationResponse setPolicyConfiguration( cstmt.setString("queue_IN", policyConf.getQueue()); cstmt.setString("policyType_IN", policyConf.getType()); cstmt.setBytes("params_IN", getByteArray(policyConf.getParams())); - cstmt.registerOutParameter("rowCount_OUT", java.sql.Types.INTEGER); + cstmt.registerOutParameter("rowCount_OUT", INTEGER); // Execute the query long startTime = clock.getTime(); @@ -984,6 +1022,22 @@ protected Connection getConnection() throws SQLException { return dataSource.getConnection(); } + /** + * Get a connection from the DataSource pool. + * + * @param isCommitted Whether to enable automatic transaction commit. + * If set to true, turn on transaction autocommit, + * if set to false, turn off transaction autocommit. + * + * @return a connection from the DataSource pool. + * @throws SQLException on failure. + */ + protected Connection getConnection(boolean isCommitted) throws SQLException { + Connection dbConn = getConnection(); + dbConn.setAutoCommit(isCommitted); + return dbConn; + } + @VisibleForTesting protected CallableStatement getCallableStatement(String procedure) throws SQLException { @@ -1029,9 +1083,9 @@ public AddReservationHomeSubClusterResponse addReservationHomeSubCluster( // 2)IN homeSubCluster_IN varchar(256) cstmt.setString("homeSubCluster_IN", subClusterId.getId()); // 3) OUT storedHomeSubCluster_OUT varchar(256) - cstmt.registerOutParameter("storedHomeSubCluster_OUT", java.sql.Types.VARCHAR); + cstmt.registerOutParameter("storedHomeSubCluster_OUT", VARCHAR); // 4) OUT rowCount_OUT int - cstmt.registerOutParameter("rowCount_OUT", java.sql.Types.INTEGER); + cstmt.registerOutParameter("rowCount_OUT", INTEGER); // Execute the query long startTime = clock.getTime(); @@ -1119,7 +1173,7 @@ public GetReservationHomeSubClusterResponse getReservationHomeSubCluster( // 1)IN reservationId_IN varchar(128) cstmt.setString("reservationId_IN", reservationId.toString()); // 2)OUT homeSubCluster_OUT varchar(256) - cstmt.registerOutParameter("homeSubCluster_OUT", java.sql.Types.VARCHAR); + cstmt.registerOutParameter("homeSubCluster_OUT", VARCHAR); // Execute the query long startTime = clock.getTime(); @@ -1237,7 +1291,7 @@ public DeleteReservationHomeSubClusterResponse deleteReservationHomeSubCluster( // 1)IN reservationId_IN varchar(128) cstmt.setString("reservationId_IN", reservationId.toString()); // 2)OUT rowCount_OUT int - cstmt.registerOutParameter("rowCount_OUT", java.sql.Types.INTEGER); + cstmt.registerOutParameter("rowCount_OUT", INTEGER); // Execute the query long startTime = clock.getTime(); @@ -1306,7 +1360,7 @@ public UpdateReservationHomeSubClusterResponse updateReservationHomeSubCluster( // 2)IN homeSubCluster_IN varchar(256) cstmt.setString("homeSubCluster_IN", subClusterId.getId()); // 3)OUT rowCount_OUT int - cstmt.registerOutParameter("rowCount_OUT", java.sql.Types.INTEGER); + cstmt.registerOutParameter("rowCount_OUT", INTEGER); // Execute the query long startTime = clock.getTime(); @@ -1353,70 +1407,503 @@ public Connection getConn() { return conn; } + /** + * SQLFederationStateStore Supports Store New MasterKey. + * + * @param request The request contains RouterMasterKey, which is an abstraction for DelegationKey. + * @return routerMasterKeyResponse, the response contains the RouterMasterKey. + * @throws YarnException if the call to the state store is unsuccessful. + * @throws IOException An IO Error occurred. + */ @Override public RouterMasterKeyResponse storeNewMasterKey(RouterMasterKeyRequest request) throws YarnException, IOException { - throw new NotImplementedException("Code is not implemented"); + + // Step1: Verify parameters to ensure that key fields are not empty. + FederationRouterRMTokenInputValidator.validate(request); + + // Step2: Parse the parameters and serialize the DelegationKey as a string. + DelegationKey delegationKey = convertMasterKeyToDelegationKey(request); + int keyId = delegationKey.getKeyId(); + String delegationKeyStr = FederationStateStoreUtils.encodeWritable(delegationKey); + + // Step3. store data in database. + try { + + FederationSQLOutParameter rowCountOUT = + new FederationSQLOutParameter<>("rowCount_OUT", INTEGER, Integer.class); + + // Execute the query + long startTime = clock.getTime(); + Integer rowCount = getRowCountByProcedureSQL(CALL_SP_ADD_MASTERKEY, keyId, + delegationKeyStr, rowCountOUT); + long stopTime = clock.getTime(); + + // We hope that 1 record can be written to the database. + // If the number of records is not 1, it means that the data was written incorrectly. + if (rowCount != 1) { + FederationStateStoreUtils.logAndThrowStoreException(LOG, + "Wrong behavior during the insertion of masterKey, keyId = %s. " + + "please check the records of the database.", String.valueOf(keyId)); + } + FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime); + } catch (SQLException e) { + FederationStateStoreClientMetrics.failedStateStoreCall(); + FederationStateStoreUtils.logAndThrowRetriableException(e, LOG, + "Unable to insert the newly masterKey, keyId = %s.", String.valueOf(keyId)); + } + + // Step4. Query Data from the database and return the result. + return getMasterKeyByDelegationKey(request); } + /** + * SQLFederationStateStore Supports Remove MasterKey. + * + * Defined the sp_deleteMasterKey procedure. + * This procedure requires 1 input parameters, 1 output parameters. + * Input parameters + * 1. IN keyId_IN int + * Output parameters + * 2. OUT rowCount_OUT int + * + * @param request The request contains RouterMasterKey, which is an abstraction for DelegationKey + * @return routerMasterKeyResponse, the response contains the RouterMasterKey. + * @throws YarnException if the call to the state store is unsuccessful. + * @throws IOException An IO Error occurred. + */ @Override public RouterMasterKeyResponse removeStoredMasterKey(RouterMasterKeyRequest request) throws YarnException, IOException { - throw new NotImplementedException("Code is not implemented"); + + // Step1: Verify parameters to ensure that key fields are not empty. + FederationRouterRMTokenInputValidator.validate(request); + + // Step2: Parse parameters and get KeyId. + RouterMasterKey paramMasterKey = request.getRouterMasterKey(); + int paramKeyId = paramMasterKey.getKeyId(); + + // Step3. Clear data from database. + try { + + // Execute the query + long startTime = clock.getTime(); + FederationSQLOutParameter rowCountOUT = + new FederationSQLOutParameter<>("rowCount_OUT", INTEGER, Integer.class); + Integer rowCount = getRowCountByProcedureSQL(CALL_SP_DELETE_MASTERKEY, + paramKeyId, rowCountOUT); + long stopTime = clock.getTime(); + + // if it is equal to 0 it means the call + // did not delete the reservation from FederationStateStore + if (rowCount == 0) { + FederationStateStoreUtils.logAndThrowStoreException(LOG, + "masterKeyId = %s does not exist.", String.valueOf(paramKeyId)); + } else if (rowCount != 1) { + // if it is different from 1 it means the call + // had a wrong behavior. Maybe the database is not set correctly. + FederationStateStoreUtils.logAndThrowStoreException(LOG, + "Wrong behavior during deleting the keyId %s. " + + "The database is expected to delete 1 record, " + + "but the number of deleted records returned by the database is greater than 1, " + + "indicating that a duplicate masterKey occurred during the deletion process.", + paramKeyId); + } + + LOG.info("Delete from the StateStore the keyId: {}.", paramKeyId); + FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime); + return RouterMasterKeyResponse.newInstance(paramMasterKey); + } catch (SQLException e) { + FederationStateStoreClientMetrics.failedStateStoreCall(); + FederationStateStoreUtils.logAndThrowRetriableException(e, LOG, + "Unable to delete the keyId %s.", paramKeyId); + } + + throw new YarnException("Unable to delete the masterKey, keyId = " + paramKeyId); } + /** + * SQLFederationStateStore Supports Remove MasterKey. + * + * Defined the sp_getMasterKey procedure. + * this procedure requires 2 parameters. + * Input parameters: + * 1. IN keyId_IN int + * Output parameters: + * 2. OUT masterKey_OUT varchar(1024) + * + * @param request The request contains RouterMasterKey, which is an abstraction for DelegationKey + * @return routerMasterKeyResponse, the response contains the RouterMasterKey. + * @throws YarnException if the call to the state store is unsuccessful. + * @throws IOException An IO Error occurred. + */ @Override public RouterMasterKeyResponse getMasterKeyByDelegationKey(RouterMasterKeyRequest request) throws YarnException, IOException { - throw new NotImplementedException("Code is not implemented"); + // Step1: Verify parameters to ensure that key fields are not empty. + FederationRouterRMTokenInputValidator.validate(request); + + // Step2: Parse parameters and get KeyId. + RouterMasterKey paramMasterKey = request.getRouterMasterKey(); + int paramKeyId = paramMasterKey.getKeyId(); + + // Step3: Call the stored procedure to get the result. + try { + + FederationQueryRunner runner = new FederationQueryRunner(); + FederationSQLOutParameter masterKeyOUT = + new FederationSQLOutParameter<>("masterKey_OUT", VARCHAR, String.class); + + // Execute the query + long startTime = clock.getTime(); + RouterMasterKey routerMasterKey = runner.execute( + conn, CALL_SP_GET_MASTERKEY, new RouterMasterKeyHandler(), paramKeyId, masterKeyOUT); + long stopTime = clock.getTime(); + + LOG.info("Got the information about the specified masterKey = {} according to keyId = {}.", + routerMasterKey, paramKeyId); + + FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime); + + // Return query result. + return RouterMasterKeyResponse.newInstance(routerMasterKey); + + } catch (SQLException e) { + FederationStateStoreClientMetrics.failedStateStoreCall(); + FederationStateStoreUtils.logAndThrowRetriableException(e, LOG, + "Unable to obtain the masterKey information according to %s.", + String.valueOf(paramKeyId)); + } + + // Throw exception information + throw new YarnException( + "Unable to obtain the masterKey information according to " + paramKeyId); } + /** + * SQLFederationStateStore Supports Store RMDelegationTokenIdentifier. + * + * Defined the sp_addDelegationToken procedure. + * This procedure requires 4 input parameters, 1 output parameters. + * Input parameters: + * 1. IN sequenceNum_IN int + * 2. IN tokenIdent_IN varchar(1024) + * 3. IN token_IN varchar(1024) + * 4. IN renewDate_IN bigint + * Output parameters: + * 5. OUT rowCount_OUT int + * + * @param request The request contains RouterRMToken (RMDelegationTokenIdentifier and renewDate) + * @return routerRMTokenResponse, the response contains the RouterStoreToken. + * @throws YarnException if the call to the state store is unsuccessful. + * @throws IOException An IO Error occurred. + */ @Override public RouterRMTokenResponse storeNewToken(RouterRMTokenRequest request) throws YarnException, IOException { - throw new NotImplementedException("Code is not implemented"); + + // Step1: Verify parameters to ensure that key fields are not empty. + FederationRouterRMTokenInputValidator.validate(request); + + // Step2. store data in database. + try { + long duration = addOrUpdateToken(request, true); + FederationStateStoreClientMetrics.succeededStateStoreCall(duration); + } catch (SQLException e) { + FederationStateStoreClientMetrics.failedStateStoreCall(); + throw new YarnException(e); + } + + // Step3. Query Data from the database and return the result. + return getTokenByRouterStoreToken(request); } + /** + * SQLFederationStateStore Supports Update RMDelegationTokenIdentifier. + * + * Defined the sp_updateDelegationToken procedure. + * This procedure requires 4 input parameters, 1 output parameters. + * Input parameters: + * 1. IN sequenceNum_IN int + * 2. IN tokenIdent_IN varchar(1024) + * 3. IN token_IN varchar(1024) + * 4. IN renewDate_IN bigint + * Output parameters: + * 5. OUT rowCount_OUT int + * + * @param request The request contains RouterRMToken (RMDelegationTokenIdentifier and renewDate) + * @return routerRMTokenResponse, the response contains the RouterStoreToken. + * @throws YarnException if the call to the state store is unsuccessful. + * @throws IOException An IO Error occurred. + */ @Override public RouterRMTokenResponse updateStoredToken(RouterRMTokenRequest request) throws YarnException, IOException { - throw new NotImplementedException("Code is not implemented"); + + // Step1: Verify parameters to ensure that key fields are not empty. + FederationRouterRMTokenInputValidator.validate(request); + + // Step2. update data in database. + try { + long duration = addOrUpdateToken(request, false); + FederationStateStoreClientMetrics.succeededStateStoreCall(duration); + } catch (SQLException e) { + FederationStateStoreClientMetrics.failedStateStoreCall(); + throw new YarnException(e); + } + + // Step3. Query Data from the database and return the result. + return getTokenByRouterStoreToken(request); } + /** + * Add Or Update RMDelegationTokenIdentifier. + * + * @param request The request contains RouterRMToken (RMDelegationTokenIdentifier and renewDate) + * @param isAdd true, addData; false, updateData. + * @return method operation time. + * @throws IOException An IO Error occurred. + * @throws SQLException An SQL Error occurred. + * @throws YarnException if the call to the state store is unsuccessful. + */ + private long addOrUpdateToken(RouterRMTokenRequest request, boolean isAdd) + throws IOException, SQLException, YarnException { + + // Parse parameters and get KeyId. + RouterStoreToken routerStoreToken = request.getRouterStoreToken(); + YARNDelegationTokenIdentifier identifier = routerStoreToken.getTokenIdentifier(); + String tokenIdentifier = FederationStateStoreUtils.encodeWritable(identifier); + String tokenInfo = routerStoreToken.getTokenInfo(); + long renewDate = routerStoreToken.getRenewDate(); + int sequenceNum = identifier.getSequenceNumber(); + + FederationQueryRunner runner = new FederationQueryRunner(); + FederationSQLOutParameter rowCountOUT = + new FederationSQLOutParameter<>("rowCount_OUT", INTEGER, Integer.class); + + // Execute the query + long startTime = clock.getTime(); + String procedure = isAdd ? CALL_SP_ADD_DELEGATIONTOKEN : CALL_SP_UPDATE_DELEGATIONTOKEN; + Integer rowCount = runner.execute(conn, procedure, new RowCountHandler("rowCount_OUT"), + sequenceNum, tokenIdentifier, tokenInfo, renewDate, rowCountOUT); + long stopTime = clock.getTime(); + + // Get rowCount + // In the process of updating the code, rowCount may be 0 or 1; + // if rowCount=1, it is as expected, indicating that we have updated the Token correctly; + // if rowCount=0, it is not as expected, + // indicating that we have not updated the Token correctly. + if (rowCount != 1) { + FederationStateStoreUtils.logAndThrowStoreException(LOG, + "Wrong behavior during the insertion of delegationToken, tokenId = %s. " + + "Please check the records of the database.", String.valueOf(sequenceNum)); + } + + // return execution time + return (stopTime - startTime); + } + + /** + * SQLFederationStateStore Supports Remove RMDelegationTokenIdentifier. + * + * Defined the sp_deleteDelegationToken procedure. + * This procedure requires 1 input parameters, 1 output parameters. + * Input parameters: + * 1. IN sequenceNum_IN bigint + * Output parameters: + * 2. OUT rowCount_OUT int + * + * @param request The request contains RouterRMToken (RMDelegationTokenIdentifier and renewDate) + * @return routerRMTokenResponse, the response contains the RouterStoreToken. + * @throws YarnException if the call to the state store is unsuccessful. + * @throws IOException An IO Error occurred. + */ @Override public RouterRMTokenResponse removeStoredToken(RouterRMTokenRequest request) throws YarnException, IOException { - throw new NotImplementedException("Code is not implemented"); + + // Step1: Verify parameters to ensure that key fields are not empty. + FederationRouterRMTokenInputValidator.validate(request); + + // Step2: Parse parameters and get KeyId. + RouterStoreToken routerStoreToken = request.getRouterStoreToken(); + YARNDelegationTokenIdentifier identifier = routerStoreToken.getTokenIdentifier(); + int sequenceNum = identifier.getSequenceNumber(); + + try { + + FederationSQLOutParameter rowCountOUT = + new FederationSQLOutParameter<>("rowCount_OUT", INTEGER, Integer.class); + + // Execute the query + long startTime = clock.getTime(); + Integer rowCount = getRowCountByProcedureSQL(CALL_SP_DELETE_DELEGATIONTOKEN, + sequenceNum, rowCountOUT); + long stopTime = clock.getTime(); + + // if it is equal to 0 it means the call + // did not delete the reservation from FederationStateStore + if (rowCount == 0) { + FederationStateStoreUtils.logAndThrowStoreException(LOG, + "TokenId %s does not exist", String.valueOf(sequenceNum)); + } else if (rowCount != 1) { + // if it is different from 1 it means the call + // had a wrong behavior. Maybe the database is not set correctly. + FederationStateStoreUtils.logAndThrowStoreException(LOG, + "Wrong behavior during deleting the delegationToken %s. " + + "The database is expected to delete 1 record, " + + "but the number of deleted records returned by the database is greater than 1, " + + "indicating that a duplicate tokenId occurred during the deletion process.", + String.valueOf(sequenceNum)); + } + + LOG.info("Delete from the StateStore the delegationToken, tokenId = {}.", sequenceNum); + FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime); + return RouterRMTokenResponse.newInstance(routerStoreToken); + } catch (SQLException e) { + FederationStateStoreClientMetrics.failedStateStoreCall(); + FederationStateStoreUtils.logAndThrowRetriableException(e, LOG, + "Unable to delete the delegationToken, tokenId = %s.", sequenceNum); + } + throw new YarnException("Unable to delete the delegationToken, tokenId = " + sequenceNum); } + /** + * The Router Supports GetTokenByRouterStoreToken. + * + * @param request The request contains RouterRMToken (RMDelegationTokenIdentifier and renewDate) + * @return RouterRMTokenResponse. + * @throws YarnException if the call to the state store is unsuccessful. + * @throws IOException An IO Error occurred. + */ @Override public RouterRMTokenResponse getTokenByRouterStoreToken(RouterRMTokenRequest request) throws YarnException, IOException { - throw new NotImplementedException("Code is not implemented"); + // Step1: Verify parameters to ensure that key fields are not empty. + FederationRouterRMTokenInputValidator.validate(request); + + // Step2: Parse parameters and get KeyId. + RouterStoreToken routerStoreToken = request.getRouterStoreToken(); + YARNDelegationTokenIdentifier identifier = routerStoreToken.getTokenIdentifier(); + int sequenceNum = identifier.getSequenceNumber(); + + try { + FederationQueryRunner runner = new FederationQueryRunner(); + FederationSQLOutParameter tokenIdentOUT = + new FederationSQLOutParameter<>("tokenIdent_OUT", VARCHAR, String.class); + FederationSQLOutParameter tokenOUT = + new FederationSQLOutParameter<>("token_OUT", VARCHAR, String.class); + FederationSQLOutParameter renewDateOUT = + new FederationSQLOutParameter<>("renewDate_OUT", BIGINT, Long.class); + + // Execute the query + long startTime = clock.getTime(); + RouterStoreToken resultToken = runner.execute(conn, CALL_SP_GET_DELEGATIONTOKEN, + new RouterStoreTokenHandler(), sequenceNum, tokenIdentOUT, tokenOUT, renewDateOUT); + long stopTime = clock.getTime(); + + FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime); + return RouterRMTokenResponse.newInstance(resultToken); + } catch (SQLException e) { + FederationStateStoreClientMetrics.failedStateStoreCall(); + FederationStateStoreUtils.logAndThrowRetriableException(e, LOG, + "Unable to get the delegationToken, tokenId = %s.", String.valueOf(sequenceNum)); + } + + // Throw exception information + throw new YarnException("Unable to get the delegationToken, tokenId = " + sequenceNum); } + /** + * Call Procedure to get RowCount. + * + * @param procedure procedureSQL. + * @param params procedure params. + * @return RowCount. + * @throws SQLException An exception occurred when calling a stored procedure. + */ + private int getRowCountByProcedureSQL(String procedure, Object... params) throws SQLException { + FederationQueryRunner runner = new FederationQueryRunner(); + // Execute the query + Integer rowCount = runner.execute(conn, procedure, + new RowCountHandler("rowCount_OUT"), params); + return rowCount; + } + + /** + * Increment DelegationToken SeqNum. + * + * @return delegationTokenSeqNum. + */ @Override public int incrementDelegationTokenSeqNum() { - return 0; + return querySequenceTable(YARN_ROUTER_SEQUENCE_NUM, true); } + /** + * Get DelegationToken SeqNum. + * + * @return delegationTokenSeqNum. + */ @Override public int getDelegationTokenSeqNum() { - return 0; + return querySequenceTable(YARN_ROUTER_SEQUENCE_NUM, false); } @Override public void setDelegationTokenSeqNum(int seqNum) { - return; + Connection connection = null; + try { + connection = getConnection(false); + FederationQueryRunner runner = new FederationQueryRunner(); + runner.updateSequenceTable(connection, YARN_ROUTER_SEQUENCE_NUM, seqNum); + } catch (Exception e) { + throw new RuntimeException("Could not update sequence table!!", e); + } finally { + // Return to the pool the CallableStatement + try { + FederationStateStoreUtils.returnToPool(LOG, null, connection); + } catch (YarnException e) { + LOG.error("close connection error.", e); + } + } } + /** + * Get Current KeyId. + * + * @return currentKeyId. + */ @Override public int getCurrentKeyId() { - return 0; + return querySequenceTable(YARN_ROUTER_CURRENT_KEY_ID, false); } + /** + * The Router Supports incrementCurrentKeyId. + * + * @return CurrentKeyId. + */ @Override public int incrementCurrentKeyId() { - return 0; + return querySequenceTable(YARN_ROUTER_CURRENT_KEY_ID, true); + } + + private int querySequenceTable(String sequenceName, boolean isUpdate){ + Connection connection = null; + try { + connection = getConnection(false); + FederationQueryRunner runner = new FederationQueryRunner(); + return runner.selectOrUpdateSequenceTable(connection, sequenceName, isUpdate); + } catch (Exception e) { + throw new RuntimeException("Could not query sequence table!!", e); + } finally { + // Return to the pool the CallableStatement + try { + FederationStateStoreUtils.returnToPool(LOG, null, connection); + } catch (YarnException e) { + LOG.error("close connection error.", e); + } + } } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/RouterRMDTSecretManagerState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/RouterRMDTSecretManagerState.java index 85a8002c91..62a89f419d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/RouterRMDTSecretManagerState.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/RouterRMDTSecretManagerState.java @@ -28,13 +28,13 @@ public class RouterRMDTSecretManagerState { // DTIdentifier -> renewDate - private Map delegationTokenState = new HashMap<>(); + private Map delegationTokenState = new HashMap<>(); private Set masterKeyState = new HashSet<>(); private int dtSequenceNumber = 0; - public Map getTokenState() { + public Map getTokenState() { return delegationTokenState; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/RouterStoreToken.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/RouterStoreToken.java index 29f86903f9..a20297399b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/RouterStoreToken.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/RouterStoreToken.java @@ -18,7 +18,9 @@ package org.apache.hadoop.yarn.server.federation.store.records; import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.yarn.security.client.YARNDelegationTokenIdentifier; import org.apache.hadoop.yarn.util.Records; @@ -39,6 +41,17 @@ public static RouterStoreToken newInstance(YARNDelegationTokenIdentifier identif return storeToken; } + @Private + @Unstable + public static RouterStoreToken newInstance(YARNDelegationTokenIdentifier identifier, + Long renewdate, String tokenInfo) { + RouterStoreToken storeToken = Records.newRecord(RouterStoreToken.class); + storeToken.setIdentifier(identifier); + storeToken.setRenewDate(renewdate); + storeToken.setTokenInfo(tokenInfo); + return storeToken; + } + @Private @Unstable public abstract YARNDelegationTokenIdentifier getTokenIdentifier() throws IOException; @@ -47,8 +60,8 @@ public static RouterStoreToken newInstance(YARNDelegationTokenIdentifier identif @Unstable public abstract void setIdentifier(YARNDelegationTokenIdentifier identifier); - @Private - @Unstable + @Public + @Stable public abstract Long getRenewDate(); @Private @@ -62,4 +75,12 @@ public static RouterStoreToken newInstance(YARNDelegationTokenIdentifier identif @Private @Unstable public abstract void readFields(DataInput in) throws IOException; + + @Public + @Stable + public abstract String getTokenInfo(); + + @Private + @Unstable + public abstract void setTokenInfo(String tokenInfo); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/RouterStoreTokenPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/RouterStoreTokenPBImpl.java index df6030a3f0..a89cf06f0b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/RouterStoreTokenPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/RouterStoreTokenPBImpl.java @@ -47,6 +47,7 @@ public class RouterStoreTokenPBImpl extends RouterStoreToken { private YARNDelegationTokenIdentifier rMDelegationTokenIdentifier = null; private Long renewDate; + private String tokenInfo; public RouterStoreTokenPBImpl() { builder = RouterStoreTokenProto.newBuilder(); @@ -84,6 +85,10 @@ private void mergeLocalToBuilder() { if (this.renewDate != null) { builder.setRenewDate(this.renewDate); } + + if (this.tokenInfo != null) { + builder.setTokenInfo(this.tokenInfo); + } } private void maybeInitBuilder() { @@ -164,6 +169,29 @@ public void setRenewDate(Long renewDate) { this.renewDate = renewDate; this.builder.setRenewDate(renewDate); } + @Override + public String getTokenInfo() { + RouterStoreTokenProtoOrBuilder p = viaProto ? proto : builder; + if (this.tokenInfo != null) { + return this.tokenInfo; + } + if (!p.hasTokenInfo()) { + return null; + } + this.tokenInfo = p.getTokenInfo(); + return this.tokenInfo; + } + + @Override + public void setTokenInfo(String tokenInfo) { + maybeInitBuilder(); + if (tokenInfo == null) { + builder.clearTokenInfo(); + return; + } + this.tokenInfo = tokenInfo; + this.builder.setTokenInfo(tokenInfo); + } private YARNDelegationTokenIdentifierProto convertToProtoFormat( YARNDelegationTokenIdentifier delegationTokenIdentifier) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/sql/DatabaseProduct.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/sql/DatabaseProduct.java new file mode 100644 index 0000000000..3b3af24239 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/sql/DatabaseProduct.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.federation.store.sql; + +import org.apache.hadoop.classification.InterfaceAudience.Private; + +import java.sql.Connection; +import java.sql.SQLException; + +@Private +public final class DatabaseProduct { + + public enum DbType {MYSQL, SQLSERVER, POSTGRES, UNDEFINED, HSQLDB} + + private static final String SQL_SERVER_NAME = "sqlserver"; + private static final String MYSQL_NAME = "mysql"; + private static final String MARIADB_NAME = "mariadb"; + private static final String HSQLDB_NAME = "hsqldatabase"; + + private DatabaseProduct() { + } + + public static DbType getDbType(Connection conn) throws SQLException { + if (conn == null) { + return DbType.UNDEFINED; + } + String productName = getProductName(conn); + return getDbType(productName); + } + + /** + * We get DBType based on ProductName. + * + * @param productName productName. + * @return DbType. + */ + private static DbType getDbType(String productName) { + DbType dbt; + productName = productName.replaceAll("\\s+", "").toLowerCase(); + if (productName.contains(SQL_SERVER_NAME)) { + dbt = DbType.SQLSERVER; + } else if (productName.contains(MYSQL_NAME) || productName.contains(MARIADB_NAME)) { + dbt = DbType.MYSQL; + } else if (productName.contains(HSQLDB_NAME)) { + dbt = DbType.HSQLDB; + } else { + dbt = DbType.UNDEFINED; + } + return dbt; + } + + /** + * We get ProductName based on metadata in SQL Connection. + * + * @param conn SQL Connection + * @return DB ProductName (Like MySQL SQLSERVER etc.) + */ + private static String getProductName(Connection conn) throws SQLException { + return conn.getMetaData().getDatabaseProductName(); + } + + /** + * We add for update to SQL according to different database types. + * This statement can ensure that a row of records in the database is only updated by one thread. + * + * @param dbType type of database. + * @param selectStatement querySQL. + * @return SQL after adding for update. + * @throws SQLException SQL exception. + */ + public static String addForUpdateClause(DbType dbType, String selectStatement) + throws SQLException { + switch (dbType) { + case MYSQL: + case HSQLDB: + return selectStatement + " for update"; + case SQLSERVER: + String modifier = " with (updlock)"; + int wherePos = selectStatement.toUpperCase().indexOf(" WHERE "); + if (wherePos < 0) { + return selectStatement + modifier; + } + return selectStatement.substring(0, wherePos) + modifier + + selectStatement.substring(wherePos, selectStatement.length()); + default: + String msg = "Unrecognized database product name <" + dbType + ">"; + throw new SQLException(msg); + } + } + + public static boolean isDuplicateKeyError(DbType dbType, SQLException ex) { + switch (dbType) { + case MYSQL: + if((ex.getErrorCode() == 1022 || ex.getErrorCode() == 1062 || ex.getErrorCode() == 1586) && + "23000".equals(ex.getSQLState())) { + return true; + } + break; + case SQLSERVER: + if ((ex.getErrorCode() == 2627 || ex.getErrorCode() == 2601) + && "23000".equals(ex.getSQLState())) { + return true; + } + break; + default: + return false; + } + return false; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/sql/FederationQueryRunner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/sql/FederationQueryRunner.java new file mode 100644 index 0000000000..0cca531d0d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/sql/FederationQueryRunner.java @@ -0,0 +1,310 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.federation.store.sql; + +import org.apache.hadoop.classification.VisibleForTesting; + +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.CallableStatement; +import java.sql.ResultSet; +import java.util.Arrays; + +import org.apache.hadoop.yarn.server.federation.store.sql.DatabaseProduct.DbType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.hadoop.yarn.server.federation.store.sql.DatabaseProduct.isDuplicateKeyError; + +/** + * QueryRunner is used to execute stored procedure SQL and parse the returned results. + */ +public class FederationQueryRunner { + + public final static String YARN_ROUTER_SEQUENCE_NUM = "YARN_ROUTER_SEQUENCE_NUM"; + + public final static String YARN_ROUTER_CURRENT_KEY_ID = "YARN_ROUTER_CURRENT_KEY_ID"; + + public final static String QUERY_SEQUENCE_TABLE_SQL = + "SELECT nextVal FROM sequenceTable WHERE sequenceName = %s"; + + public final static String INSERT_SEQUENCE_TABLE_SQL = "" + + "INSERT INTO sequenceTable(sequenceName, nextVal) VALUES(%s, %d)"; + + public final static String UPDATE_SEQUENCE_TABLE_SQL = "" + + "UPDATE sequenceTable SET nextVal = %d WHERE sequenceName = %s"; + + public static final Logger LOG = LoggerFactory.getLogger(FederationQueryRunner.class); + + /** + * Execute Stored Procedure SQL. + * + * @param conn Database Connection. + * @param procedure Stored Procedure SQL. + * @param rsh Result Set handler. + * @param params List of stored procedure parameters. + * @param Generic T. + * @return Stored Procedure Result Set. + * @throws SQLException An exception occurred when calling a stored procedure. + */ + public T execute(Connection conn, String procedure, ResultSetHandler rsh, Object... params) + throws SQLException { + if (conn == null) { + throw new SQLException("Null connection"); + } + + if (procedure == null) { + throw new SQLException("Null Procedure SQL statement"); + } + + if (rsh == null) { + throw new SQLException("Null ResultSetHandler"); + } + + CallableStatement stmt = null; + T results = null; + + try { + stmt = this.getCallableStatement(conn, procedure); + this.fillStatement(stmt, params); + stmt.executeUpdate(); + this.retrieveOutParameters(stmt, params); + results = rsh.handle(params); + } catch (SQLException e) { + this.rethrow(e, procedure, params); + } finally { + close(stmt); + } + return results; + } + + /** + * Get CallableStatement from Conn. + * + * @param conn Database Connection. + * @param procedure Stored Procedure SQL. + * @return CallableStatement. + * @throws SQLException An exception occurred when calling a stored procedure. + */ + @VisibleForTesting + protected CallableStatement getCallableStatement(Connection conn, String procedure) + throws SQLException { + return conn.prepareCall(procedure); + } + + /** + * Set Statement parameters. + * + * @param stmt CallableStatement. + * @param params Stored procedure parameters. + * @throws SQLException An exception occurred when calling a stored procedure. + */ + public void fillStatement(CallableStatement stmt, Object... params) + throws SQLException { + for (int i = 0; i < params.length; i++) { + if (params[i] != null) { + if (stmt != null) { + if (params[i] instanceof FederationSQLOutParameter) { + FederationSQLOutParameter sqlOutParameter = (FederationSQLOutParameter) params[i]; + sqlOutParameter.register(stmt, i + 1); + } else { + stmt.setObject(i + 1, params[i]); + } + } + } + } + } + + /** + * Close Statement. + * + * @param stmt CallableStatement. + * @throws SQLException An exception occurred when calling a stored procedure. + */ + public void close(Statement stmt) throws SQLException { + if (stmt != null) { + stmt.close(); + stmt = null; + } + } + + /** + * Retrieve execution result from CallableStatement. + * + * @param stmt CallableStatement. + * @param params Stored procedure parameters. + * @throws SQLException An exception occurred when calling a stored procedure. + */ + private void retrieveOutParameters(CallableStatement stmt, Object[] params) throws SQLException { + if (params != null && stmt != null) { + for (int i = 0; i < params.length; i++) { + if (params[i] instanceof FederationSQLOutParameter) { + FederationSQLOutParameter sqlOutParameter = (FederationSQLOutParameter) params[i]; + sqlOutParameter.setValue(stmt, i + 1); + } + } + } + } + + /** + * Re-throw SQL exception. + * + * @param cause SQLException. + * @param sql Stored Procedure SQL. + * @param params Stored procedure parameters. + * @throws SQLException An exception occurred when calling a stored procedure. + */ + protected void rethrow(SQLException cause, String sql, Object... params) + throws SQLException { + + String causeMessage = cause.getMessage(); + if (causeMessage == null) { + causeMessage = ""; + } + + StringBuffer msg = new StringBuffer(causeMessage); + msg.append(" Query: "); + msg.append(sql); + msg.append(" Parameters: "); + + if (params == null) { + msg.append("[]"); + } else { + msg.append(Arrays.deepToString(params)); + } + + SQLException e = new SQLException(msg.toString(), cause.getSQLState(), cause.getErrorCode()); + e.setNextException(cause); + throw e; + } + + /** + * We query or update the SequenceTable. + * + * @param connection database conn. + * @param sequenceName sequenceName, We currently have 2 sequences, + * YARN_ROUTER_SEQUENCE_NUM and YARN_ROUTER_CURRENT_KEY_ID. + * @param isUpdate true, means we will update the SequenceTable, + * false, we query the SequenceTable. + * + * @return SequenceValue. + * @throws SQLException An exception occurred when calling a stored procedure. + */ + public int selectOrUpdateSequenceTable(Connection connection, String sequenceName, + boolean isUpdate) throws SQLException { + + int maxSequenceValue = 0; + boolean insertDone = false; + boolean committed = false; + Statement statement = null; + + try { + + // Step1. Query SequenceValue. + while (maxSequenceValue == 0) { + // Query SQL. + String sql = String.format(QUERY_SEQUENCE_TABLE_SQL, quoteString(sequenceName)); + DbType dbType = DatabaseProduct.getDbType(connection); + String forUpdateSQL = DatabaseProduct.addForUpdateClause(dbType, sql); + statement = connection.createStatement(); + ResultSet rs = statement.executeQuery(forUpdateSQL); + if (rs.next()) { + maxSequenceValue = rs.getInt("nextVal"); + } else if (insertDone) { + throw new SQLException("Invalid state of SEQUENCE_TABLE for " + sequenceName); + } else { + insertDone = true; + close(statement); + statement = connection.createStatement(); + String insertSQL = String.format(INSERT_SEQUENCE_TABLE_SQL, quoteString(sequenceName), 1); + try { + statement.executeUpdate(insertSQL); + } catch (SQLException e) { + // If the record is already inserted by some other thread continue to select. + if (isDuplicateKeyError(dbType, e)) { + continue; + } + LOG.error("Unable to insert into SEQUENCE_TABLE for {}.", sequenceName, e); + throw e; + } finally { + close(statement); + } + } + } + + // Step2. Increase SequenceValue. + if (isUpdate) { + int nextSequenceValue = maxSequenceValue + 1; + close(statement); + statement = connection.createStatement(); + String updateSQL = + String.format(UPDATE_SEQUENCE_TABLE_SQL, nextSequenceValue, quoteString(sequenceName)); + statement.executeUpdate(updateSQL); + maxSequenceValue = nextSequenceValue; + } + + connection.commit(); + committed = true; + return maxSequenceValue; + } catch(SQLException e){ + throw new SQLException("Unable to selectOrUpdateSequenceTable due to: " + e.getMessage(), e); + } finally { + if (!committed) { + rollbackDBConn(connection); + } + close(statement); + } + } + + public void updateSequenceTable(Connection connection, String sequenceName, int sequenceValue) + throws SQLException { + String updateSQL = + String.format(UPDATE_SEQUENCE_TABLE_SQL, sequenceValue, quoteString(sequenceName)); + boolean committed = false; + Statement statement = null; + try { + statement = connection.createStatement(); + statement.executeUpdate(updateSQL); + connection.commit(); + committed = true; + } catch (SQLException e) { + throw new SQLException("Unable to updateSequenceTable due to: " + e.getMessage()); + } finally { + if (!committed) { + rollbackDBConn(connection); + } + close(statement); + } + } + + static void rollbackDBConn(Connection dbConn) { + try { + if (dbConn != null && !dbConn.isClosed()) { + dbConn.rollback(); + } + } catch (SQLException e) { + LOG.warn("Failed to rollback db connection ", e); + } + } + + static String quoteString(String input) { + return "'" + input + "'"; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/sql/FederationSQLOutParameter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/sql/FederationSQLOutParameter.java new file mode 100644 index 0000000000..890e3e1e85 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/sql/FederationSQLOutParameter.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.federation.store.sql; + +import java.sql.CallableStatement; +import java.sql.SQLException; + +/** + * SQLOutParameter is used to set the output parameters of the stored procedure. + * @param Generic T. + */ +public class FederationSQLOutParameter { + private final int sqlType; + private final Class javaType; + private T value = null; + private String paramName; + + public FederationSQLOutParameter(String paramName, int sqlType, Class javaType) { + this.paramName = paramName; + this.sqlType = sqlType; + this.javaType = javaType; + } + + public FederationSQLOutParameter(int sqlType, Class javaType, T value) { + this.sqlType = sqlType; + this.javaType = javaType; + this.value = value; + } + + public int getSqlType() { + return sqlType; + } + + public Class getJavaType() { + return javaType; + } + + public T getValue() { + return value; + } + + public void setValue(T value) { + this.value = value; + } + + public String getParamName() { + return paramName; + } + + public void setParamName(String paramName) { + this.paramName = paramName; + } + + void setValue(CallableStatement stmt, int index) throws SQLException { + Object object = stmt.getObject(index); + value = javaType.cast(object); + } + + void register(CallableStatement stmt, int index) throws SQLException { + stmt.registerOutParameter(index, sqlType); + if (value != null) { + stmt.setObject(index, value); + } + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("OutParameter: [") + .append("SqlType: ").append(sqlType).append(", ") + .append("JavaType: ").append(javaType).append(", ") + .append("Value: ").append(value) + .append("]"); + return sb.toString(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/sql/ResultSetHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/sql/ResultSetHandler.java new file mode 100644 index 0000000000..1e793d0ecb --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/sql/ResultSetHandler.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.federation.store.sql; + +import java.sql.SQLException; + +/** + * Result Set Handler. + * + * @param Generic T. + */ +public interface ResultSetHandler { + T handle(Object... params) throws SQLException; +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/sql/RouterMasterKeyHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/sql/RouterMasterKeyHandler.java new file mode 100644 index 0000000000..3f7b3641a5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/sql/RouterMasterKeyHandler.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.federation.store.sql; + +import org.apache.hadoop.security.token.delegation.DelegationKey; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKey; +import org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreUtils; +import org.apache.hadoop.yarn.util.Records; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.sql.SQLException; + +/** + * RouterMasterKey Handler. + * Used to parse the result information of the output parameter into the RouterMasterKey type. + */ +public class RouterMasterKeyHandler implements ResultSetHandler { + + private final static String MASTERKEY_OUT = "masterKey_OUT"; + + @Override + public RouterMasterKey handle(Object... params) throws SQLException { + RouterMasterKey routerMasterKey = Records.newRecord(RouterMasterKey.class); + for (Object param : params) { + if (param instanceof FederationSQLOutParameter) { + FederationSQLOutParameter parameter = (FederationSQLOutParameter) param; + String paramName = parameter.getParamName(); + Object parmaValue = parameter.getValue(); + if (StringUtils.equalsIgnoreCase(paramName, MASTERKEY_OUT)) { + DelegationKey key = getDelegationKey(parmaValue); + routerMasterKey.setKeyId(key.getKeyId()); + routerMasterKey.setKeyBytes(ByteBuffer.wrap(key.getEncodedKey())); + routerMasterKey.setExpiryDate(key.getExpiryDate()); + } + } + } + return routerMasterKey; + } + + private DelegationKey getDelegationKey(Object paramMasterKey) throws SQLException { + try { + DelegationKey key = new DelegationKey(); + String masterKey = String.valueOf(paramMasterKey); + FederationStateStoreUtils.decodeWritable(key, masterKey); + return key; + } catch (IOException e) { + throw new SQLException(e); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/sql/RouterStoreTokenHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/sql/RouterStoreTokenHandler.java new file mode 100644 index 0000000000..465fe706f4 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/sql/RouterStoreTokenHandler.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.federation.store.sql; + +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.security.client.YARNDelegationTokenIdentifier; +import org.apache.hadoop.yarn.server.federation.store.records.RouterStoreToken; +import org.apache.hadoop.yarn.util.Records; + +import java.io.IOException; +import java.sql.SQLException; + +import static org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreUtils.decodeWritable; + +/** + * RouterStoreToken Handler. + * Used to parse the result information of the output parameter into the RouterStoreToken type. + */ +public class RouterStoreTokenHandler implements ResultSetHandler { + + private final static String TOKENIDENT_OUT = "tokenIdent_OUT"; + private final static String TOKEN_OUT = "token_OUT"; + private final static String RENEWDATE_OUT = "renewDate_OUT"; + + @Override + public RouterStoreToken handle(Object... params) throws SQLException { + RouterStoreToken storeToken = Records.newRecord(RouterStoreToken.class); + for (Object param : params) { + if (param instanceof FederationSQLOutParameter) { + FederationSQLOutParameter parameter = (FederationSQLOutParameter) param; + String paramName = parameter.getParamName(); + Object parmaValue = parameter.getValue(); + if (StringUtils.equalsIgnoreCase(paramName, TOKENIDENT_OUT)) { + YARNDelegationTokenIdentifier identifier = getYARNDelegationTokenIdentifier(parmaValue); + storeToken.setIdentifier(identifier); + } else if (StringUtils.equalsIgnoreCase(paramName, TOKEN_OUT)) { + String tokenInfo = getTokenInfo(parmaValue); + storeToken.setTokenInfo(tokenInfo); + } else if(StringUtils.equalsIgnoreCase(paramName, RENEWDATE_OUT)){ + Long renewDate = getRenewDate(parmaValue); + storeToken.setRenewDate(renewDate); + } + } + } + return storeToken; + } + + private YARNDelegationTokenIdentifier getYARNDelegationTokenIdentifier(Object tokenIdent) + throws SQLException { + try { + YARNDelegationTokenIdentifier resultIdentifier = + Records.newRecord(YARNDelegationTokenIdentifier.class); + decodeWritable(resultIdentifier, String.valueOf(tokenIdent)); + return resultIdentifier; + } catch (IOException e) { + throw new SQLException(e); + } + } + + private String getTokenInfo(Object tokenInfo) { + return String.valueOf(tokenInfo); + } + + private Long getRenewDate(Object renewDate) { + return Long.parseLong(String.valueOf(renewDate)); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/sql/RowCountHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/sql/RowCountHandler.java new file mode 100644 index 0000000000..e9438d1198 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/sql/RowCountHandler.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.federation.store.sql; + +import org.apache.hadoop.util.StringUtils; + +import java.sql.SQLException; + +/** + * RowCount Handler. + * Used to parse out the rowCount information of the output parameter. + */ +public class RowCountHandler implements ResultSetHandler { + + private String rowCountParamName; + + public RowCountHandler(String paramName) { + this.rowCountParamName = paramName; + } + + @Override + public Integer handle(Object... params) throws SQLException { + Integer result = 0; + for (Object param : params) { + if (param instanceof FederationSQLOutParameter) { + FederationSQLOutParameter parameter = (FederationSQLOutParameter) param; + String paramName = parameter.getParamName(); + Object parmaValue = parameter.getValue(); + if (StringUtils.equalsIgnoreCase(paramName, rowCountParamName)) { + result = getRowCount(parmaValue); + } + } + } + return result; + } + + private Integer getRowCount(Object rowCount) { + return Integer.parseInt(String.valueOf(rowCount)); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/sql/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/sql/package-info.java new file mode 100644 index 0000000000..d6bca3fd23 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/sql/package-info.java @@ -0,0 +1,17 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.yarn.server.federation.store.sql; \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationRouterRMTokenInputValidator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationRouterRMTokenInputValidator.java index 40fe1f36cf..1ad7cc1ce2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationRouterRMTokenInputValidator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationRouterRMTokenInputValidator.java @@ -15,7 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.yarn.server.federation.store.utils; import org.apache.hadoop.yarn.security.client.YARNDelegationTokenIdentifier; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreUtils.java index f14867a0e6..aba8ddac2f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreUtils.java @@ -18,16 +18,27 @@ package org.apache.hadoop.yarn.server.federation.store.utils; +import java.io.IOException; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.nio.ByteBuffer; import java.sql.CallableStatement; import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; +import java.util.Base64; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreException; import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreInvalidInputException; import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreRetriableException; import org.apache.hadoop.yarn.server.federation.store.metrics.FederationStateStoreClientMetrics; +import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKey; +import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyRequest; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,7 +47,6 @@ /** * Common utility methods used by the store implementations. - * */ public final class FederationStateStoreUtils { @@ -329,4 +339,61 @@ public static boolean filterHomeSubCluster(SubClusterId filterSubCluster, return false; } + + /** + * Encode for Writable objects. + * This method will convert the writable object to a base64 string. + * + * @param key Writable Key. + * @return base64 string. + * @throws IOException raised on errors performing I/O. + */ + public static String encodeWritable(Writable key) throws IOException { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(bos); + key.write(dos); + dos.flush(); + return Base64.getUrlEncoder().encodeToString(bos.toByteArray()); + } + + /** + * Decode Base64 string to Writable object. + * + * @param w Writable Key. + * @param idStr base64 string. + * @throws IOException raised on errors performing I/O. + */ + public static void decodeWritable(Writable w, String idStr) throws IOException { + DataInputStream in = new DataInputStream( + new ByteArrayInputStream(Base64.getUrlDecoder().decode(idStr))); + w.readFields(in); + } + + /** + * Convert MasterKey to DelegationKey. + * + * Before using this function, + * please use FederationRouterRMTokenInputValidator to verify the request. + * By default, the request is not empty, and the internal object is not empty. + * + * @param request RouterMasterKeyRequest + * @return DelegationKey. + */ + public static DelegationKey convertMasterKeyToDelegationKey(RouterMasterKeyRequest request) { + RouterMasterKey masterKey = request.getRouterMasterKey(); + return convertMasterKeyToDelegationKey(masterKey); + } + + /** + * Convert MasterKey to DelegationKey. + * + * @param masterKey masterKey. + * @return DelegationKey. + */ + private static DelegationKey convertMasterKeyToDelegationKey(RouterMasterKey masterKey) { + ByteBuffer keyByteBuf = masterKey.getKeyBytes(); + byte[] keyBytes = new byte[keyByteBuf.remaining()]; + keyByteBuf.get(keyBytes); + return new DelegationKey(masterKey.getKeyId(), masterKey.getExpiryDate(), keyBytes); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java index 8c36fba1f1..12625a60e9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java @@ -809,6 +809,24 @@ public void storeNewToken(RMDelegationTokenIdentifier identifier, stateStore.storeNewToken(request); } + /** + * The Router Supports Store RMDelegationTokenIdentifier{@link RMDelegationTokenIdentifier}. + * + * @param identifier delegation tokens from the RM. + * @param renewDate renewDate. + * @param tokenInfo tokenInfo. + * @throws YarnException if the call to the state store is unsuccessful. + * @throws IOException An IO Error occurred. + */ + public void storeNewToken(RMDelegationTokenIdentifier identifier, + long renewDate, String tokenInfo) throws YarnException, IOException { + LOG.info("storing RMDelegation token with sequence number: {}.", + identifier.getSequenceNumber()); + RouterStoreToken storeToken = RouterStoreToken.newInstance(identifier, renewDate, tokenInfo); + RouterRMTokenRequest request = RouterRMTokenRequest.newInstance(storeToken); + stateStore.storeNewToken(request); + } + /** * The Router Supports Update RMDelegationTokenIdentifier{@link RMDelegationTokenIdentifier}. * @@ -826,6 +844,24 @@ public void updateStoredToken(RMDelegationTokenIdentifier identifier, stateStore.updateStoredToken(request); } + /** + * The Router Supports Update RMDelegationTokenIdentifier{@link RMDelegationTokenIdentifier}. + * + * @param identifier delegation tokens from the RM + * @param renewDate renewDate + * @param tokenInfo tokenInfo. + * @throws YarnException if the call to the state store is unsuccessful. + * @throws IOException An IO Error occurred. + */ + public void updateStoredToken(RMDelegationTokenIdentifier identifier, + long renewDate, String tokenInfo) throws YarnException, IOException { + LOG.info("updating RMDelegation token with sequence number: {}.", + identifier.getSequenceNumber()); + RouterStoreToken storeToken = RouterStoreToken.newInstance(identifier, renewDate, tokenInfo); + RouterRMTokenRequest request = RouterRMTokenRequest.newInstance(storeToken); + stateStore.updateStoredToken(request); + } + /** * The Router Supports Remove RMDelegationTokenIdentifier{@link RMDelegationTokenIdentifier}. * diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_federation_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_federation_protos.proto index 0544a26e4c..26fc77e01f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_federation_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_federation_protos.proto @@ -234,6 +234,7 @@ message RouterMasterKeyResponseProto { message RouterStoreTokenProto { optional YARNDelegationTokenIdentifierProto token_identifier = 1; optional int64 renew_date = 2; + optional string token_info = 3; } message RouterRMTokenRequestProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java index 7fb1e327e8..c93115ccfd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.nio.ByteBuffer; +import java.sql.SQLException; import java.util.Calendar; import java.util.List; import java.util.Set; @@ -98,10 +99,10 @@ public abstract class FederationStateStoreBaseTest { protected abstract FederationStateStore createStateStore(); protected abstract void checkRouterMasterKey(DelegationKey delegationKey, - RouterMasterKey routerMasterKey) throws YarnException, IOException; + RouterMasterKey routerMasterKey) throws YarnException, IOException, SQLException; protected abstract void checkRouterStoreToken(RMDelegationTokenIdentifier identifier, - RouterStoreToken token) throws YarnException, IOException; + RouterStoreToken token) throws YarnException, IOException, SQLException; private Configuration conf; @@ -937,16 +938,17 @@ public void testRemoveStoredMasterKey() throws YarnException, IOException { } @Test - public void testStoreNewToken() throws IOException, YarnException { + public void testStoreNewToken() throws IOException, YarnException, SQLException { // prepare parameters RMDelegationTokenIdentifier identifier = new RMDelegationTokenIdentifier( new Text("owner1"), new Text("renewer1"), new Text("realuser1")); int sequenceNumber = 1; identifier.setSequenceNumber(sequenceNumber); Long renewDate = Time.now(); + String tokenInfo = "tokenInfo"; // store new rm-token - RouterStoreToken storeToken = RouterStoreToken.newInstance(identifier, renewDate); + RouterStoreToken storeToken = RouterStoreToken.newInstance(identifier, renewDate, tokenInfo); RouterRMTokenRequest request = RouterRMTokenRequest.newInstance(storeToken); RouterRMTokenResponse routerRMTokenResponse = stateStore.storeNewToken(request); @@ -957,33 +959,33 @@ public void testStoreNewToken() throws IOException, YarnException { Assert.assertNotNull(storeTokenResp); Assert.assertEquals(storeToken.getRenewDate(), storeTokenResp.getRenewDate()); Assert.assertEquals(storeToken.getTokenIdentifier(), storeTokenResp.getTokenIdentifier()); + Assert.assertEquals(storeToken.getTokenInfo(), storeTokenResp.getTokenInfo()); - checkRouterStoreToken(identifier, storeToken); checkRouterStoreToken(identifier, storeTokenResp); } @Test - public void testUpdateStoredToken() throws IOException, YarnException { + public void testUpdateStoredToken() throws IOException, YarnException, SQLException { // prepare saveToken parameters RMDelegationTokenIdentifier identifier = new RMDelegationTokenIdentifier( new Text("owner2"), new Text("renewer2"), new Text("realuser2")); int sequenceNumber = 2; + String tokenInfo = "tokenInfo"; identifier.setSequenceNumber(sequenceNumber); Long renewDate = Time.now(); // store new rm-token - RouterStoreToken storeToken = RouterStoreToken.newInstance(identifier, renewDate); + RouterStoreToken storeToken = RouterStoreToken.newInstance(identifier, renewDate, tokenInfo); RouterRMTokenRequest request = RouterRMTokenRequest.newInstance(storeToken); RouterRMTokenResponse routerRMTokenResponse = stateStore.storeNewToken(request); Assert.assertNotNull(routerRMTokenResponse); // prepare updateToken parameters Long renewDate2 = Time.now(); - int sequenceNumber2 = 3; - identifier.setSequenceNumber(sequenceNumber2); + String tokenInfo2 = "tokenInfo2"; // update rm-token - RouterStoreToken updateToken = RouterStoreToken.newInstance(identifier, renewDate2); + RouterStoreToken updateToken = RouterStoreToken.newInstance(identifier, renewDate2, tokenInfo2); RouterRMTokenRequest updateTokenRequest = RouterRMTokenRequest.newInstance(updateToken); RouterRMTokenResponse updateTokenResponse = stateStore.updateStoredToken(updateTokenRequest); @@ -992,6 +994,7 @@ public void testUpdateStoredToken() throws IOException, YarnException { Assert.assertNotNull(updateTokenResp); Assert.assertEquals(updateToken.getRenewDate(), updateTokenResp.getRenewDate()); Assert.assertEquals(updateToken.getTokenIdentifier(), updateTokenResp.getTokenIdentifier()); + Assert.assertEquals(updateToken.getTokenInfo(), updateTokenResp.getTokenInfo()); checkRouterStoreToken(identifier, updateTokenResp); } @@ -1004,9 +1007,10 @@ public void testRemoveStoredToken() throws IOException, YarnException { int sequenceNumber = 3; identifier.setSequenceNumber(sequenceNumber); Long renewDate = Time.now(); + String tokenInfo = "tokenInfo"; // store new rm-token - RouterStoreToken storeToken = RouterStoreToken.newInstance(identifier, renewDate); + RouterStoreToken storeToken = RouterStoreToken.newInstance(identifier, renewDate, tokenInfo); RouterRMTokenRequest request = RouterRMTokenRequest.newInstance(storeToken); RouterRMTokenResponse routerRMTokenResponse = stateStore.storeNewToken(request); Assert.assertNotNull(routerRMTokenResponse); @@ -1021,16 +1025,17 @@ public void testRemoveStoredToken() throws IOException, YarnException { } @Test - public void testGetTokenByRouterStoreToken() throws IOException, YarnException { + public void testGetTokenByRouterStoreToken() throws IOException, YarnException, SQLException { // prepare saveToken parameters RMDelegationTokenIdentifier identifier = new RMDelegationTokenIdentifier( new Text("owner4"), new Text("renewer4"), new Text("realuser4")); int sequenceNumber = 4; identifier.setSequenceNumber(sequenceNumber); Long renewDate = Time.now(); + String tokenInfo = "tokenInfo"; // store new rm-token - RouterStoreToken storeToken = RouterStoreToken.newInstance(identifier, renewDate); + RouterStoreToken storeToken = RouterStoreToken.newInstance(identifier, renewDate, tokenInfo); RouterRMTokenRequest request = RouterRMTokenRequest.newInstance(storeToken); RouterRMTokenResponse routerRMTokenResponse = stateStore.storeNewToken(request); Assert.assertNotNull(routerRMTokenResponse); @@ -1041,7 +1046,7 @@ public void testGetTokenByRouterStoreToken() throws IOException, YarnException { RouterStoreToken getStoreTokenResp = getRouterRMTokenResp.getRouterStoreToken(); Assert.assertNotNull(getStoreTokenResp); Assert.assertEquals(getStoreTokenResp.getRenewDate(), storeToken.getRenewDate()); - Assert.assertEquals(getStoreTokenResp.getTokenIdentifier(), storeToken.getTokenIdentifier()); + Assert.assertEquals(storeToken.getTokenInfo(), getStoreTokenResp.getTokenInfo()); checkRouterStoreToken(identifier, getStoreTokenResp); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/HSQLDBFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/HSQLDBFederationStateStore.java index b3bb0764df..73b65feb48 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/HSQLDBFederationStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/HSQLDBFederationStateStore.java @@ -75,6 +75,26 @@ public class HSQLDBFederationStateStore extends SQLFederationStateStore { + " homeSubCluster varchar(256) NOT NULL," + " CONSTRAINT pk_reservationId PRIMARY KEY (reservationId))"; + private static final String TABLE_MASTERKEYS = + " CREATE TABLE masterKeys (" + + " keyId bigint NOT NULL," + + " masterKey varchar(1024) NOT NULL," + + " CONSTRAINT pk_keyId PRIMARY KEY (keyId))"; + + private static final String TABLE_DELEGATIONTOKENS = + " CREATE TABLE delegationTokens (" + + " sequenceNum bigint NOT NULL," + + " tokenIdent varchar(1024) NOT NULL," + + " token varchar(1024) NOT NULL," + + " renewDate bigint NOT NULL," + + " CONSTRAINT pk_sequenceNum PRIMARY KEY (sequenceNum))"; + + private static final String TABLE_SEQUENCETABLE = + " CREATE TABLE sequenceTable (" + + " sequenceName varchar(255) NOT NULL," + + " nextVal bigint NOT NULL," + + " CONSTRAINT pk_sequenceName PRIMARY KEY (sequenceName))"; + private static final String SP_REGISTERSUBCLUSTER = "CREATE PROCEDURE sp_registerSubCluster(" + " IN subClusterId_IN varchar(256)," @@ -318,6 +338,99 @@ public class HSQLDBFederationStateStore extends SQLFederationStateStore { + " WHERE reservationId = reservationId_IN;" + " SET rowCount_OUT = 2; END"; + protected static final String SP_DROP_ADDMASTERKEY = "DROP PROCEDURE sp_addMasterKey"; + + protected static final String SP_ADDMASTERKEY = + "CREATE PROCEDURE sp_addMasterKey(" + + " IN keyId_IN int, IN masterKey_IN varchar(1024)," + + " OUT rowCount_OUT int)" + + " MODIFIES SQL DATA BEGIN ATOMIC " + + " INSERT INTO masterKeys(keyId, masterKey)" + + " (SELECT keyId_IN, masterKey_IN" + + " FROM masterKeys " + + " WHERE keyId = keyId_IN " + + " HAVING COUNT(*) = 0);" + + " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT;" + + " END"; + + protected static final String SP_DROP_GETMASTERKEY = "DROP PROCEDURE sp_getMasterKey"; + + protected static final String SP_GETMASTERKEY = + "CREATE PROCEDURE sp_getMasterKey(" + + " IN keyId_IN int," + + " OUT masterKey_OUT varchar(1024))" + + " MODIFIES SQL DATA BEGIN ATOMIC " + + " SELECT masterKey INTO masterKey_OUT " + + " FROM masterKeys " + + " WHERE keyId = keyId_IN; " + + " END "; + + protected static final String SP_DROP_DELETEMASTERKEY = "DROP PROCEDURE sp_deleteMasterKey"; + + protected static final String SP_DELETEMASTERKEY = + "CREATE PROCEDURE sp_deleteMasterKey(" + + " IN keyId_IN int, OUT rowCount_OUT int)" + + " MODIFIES SQL DATA BEGIN ATOMIC" + + " DELETE FROM masterKeys WHERE keyId = keyId_IN;" + + " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT; END"; + + protected static final String SP_DROP_ADD_DELEGATIONTOKEN = + "DROP PROCEDURE sp_addDelegationToken"; + + protected static final String SP_ADD_DELEGATIONTOKEN = + "CREATE PROCEDURE sp_addDelegationToken(" + + " IN sequenceNum_IN bigint, IN tokenIdent_IN varchar(1024)," + + " IN token_IN varchar(1024), IN renewDate_IN bigint, OUT rowCount_OUT int)" + + " MODIFIES SQL DATA BEGIN ATOMIC " + + " INSERT INTO delegationTokens(sequenceNum, tokenIdent, token, renewDate)" + + " (SELECT sequenceNum_IN, tokenIdent_IN, token_IN, renewDate_IN" + + " FROM delegationTokens" + + " WHERE sequenceNum = sequenceNum_IN" + + " HAVING COUNT(*) = 0);" + + " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT;" + + " END"; + + protected static final String SP_DROP_GET_DELEGATIONTOKEN = + "DROP PROCEDURE sp_getDelegationToken"; + + protected static final String SP_GET_DELEGATIONTOKEN = + "CREATE PROCEDURE sp_getDelegationToken(" + + " IN sequenceNum_IN bigint, OUT tokenIdent_OUT varchar(1024), " + + " OUT token_OUT varchar(1024), OUT renewDate_OUT bigint)" + + " MODIFIES SQL DATA BEGIN ATOMIC " + + " SELECT tokenIdent, token, renewDate INTO " + + " tokenIdent_OUT, token_OUT, renewDate_OUT" + + " FROM delegationTokens" + + " WHERE sequenceNum = sequenceNum_IN; " + + " END "; + + protected static final String SP_DROP_UPDATE_DELEGATIONTOKEN = + "DROP PROCEDURE sp_updateDelegationToken"; + + protected static final String SP_UPDATE_DELEGATIONTOKEN = + "CREATE PROCEDURE sp_updateDelegationToken(" + + " IN sequenceNum_IN bigint, IN tokenIdent_IN varchar(1024)," + + " IN token_IN varchar(1024), IN renewDate_IN bigint, OUT rowCount_OUT int)" + + " MODIFIES SQL DATA BEGIN ATOMIC" + + " UPDATE delegationTokens" + + " SET tokenIdent = tokenIdent_IN," + + " token = token_IN, renewDate = renewDate_IN" + + " WHERE sequenceNum = sequenceNum_IN;" + + " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT; " + + " END "; + + protected static final String SP_DROP_DELETE_DELEGATIONTOKEN = + "DROP PROCEDURE sp_deleteDelegationToken"; + + protected static final String SP_DELETE_DELEGATIONTOKEN = + "CREATE PROCEDURE sp_deleteDelegationToken(" + + " IN sequenceNum_IN bigint, OUT rowCount_OUT int)" + + " MODIFIES SQL DATA BEGIN ATOMIC" + + " DELETE FROM delegationTokens" + + " WHERE sequenceNum = sequenceNum_IN;" + + " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT; " + + " END "; + private List tables = new ArrayList<>(); @Override @@ -333,6 +446,9 @@ public void init(Configuration conf) { conn.prepareStatement(TABLE_MEMBERSHIP).execute(); conn.prepareStatement(TABLE_POLICIES).execute(); conn.prepareStatement(TABLE_RESERVATIONSHOMESUBCLUSTER).execute(); + conn.prepareStatement(TABLE_MASTERKEYS).execute(); + conn.prepareStatement(TABLE_DELEGATIONTOKENS).execute(); + conn.prepareStatement(TABLE_SEQUENCETABLE).execute(); conn.prepareStatement(SP_REGISTERSUBCLUSTER).execute(); conn.prepareStatement(SP_DEREGISTERSUBCLUSTER).execute(); @@ -356,6 +472,15 @@ public void init(Configuration conf) { conn.prepareStatement(SP_DELETERESERVATIONHOMESUBCLUSTER).execute(); conn.prepareStatement(SP_UPDATERESERVATIONHOMESUBCLUSTER).execute(); + conn.prepareStatement(SP_ADDMASTERKEY).execute(); + conn.prepareStatement(SP_GETMASTERKEY).execute(); + conn.prepareStatement(SP_DELETEMASTERKEY).execute(); + + conn.prepareStatement(SP_ADD_DELEGATIONTOKEN).execute(); + conn.prepareStatement(SP_GET_DELEGATIONTOKEN).execute(); + conn.prepareStatement(SP_UPDATE_DELEGATIONTOKEN).execute(); + conn.prepareStatement(SP_DELETE_DELEGATIONTOKEN).execute(); + LOG.info("Database Init: Complete"); } catch (Exception e) { LOG.error("ERROR: failed to initialize HSQLDB {}.", e.getMessage()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java index 0ea714ff06..5548dab1b8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java @@ -78,7 +78,7 @@ protected void checkRouterStoreToken(RMDelegationTokenIdentifier identifier, memoryStateStore.getRouterRMSecretManagerState(); assertNotNull(secretManagerState); - Map tokenStateMap = + Map tokenStateMap = secretManagerState.getTokenState(); assertNotNull(tokenStateMap); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestSQLFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestSQLFederationStateStore.java index befdf48976..91414ebc70 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestSQLFederationStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestSQLFederationStateStore.java @@ -17,7 +17,6 @@ package org.apache.hadoop.yarn.server.federation.store.impl; -import org.apache.commons.lang3.NotImplementedException; import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.test.LambdaTestUtils; import org.apache.hadoop.util.Time; @@ -37,8 +36,14 @@ import org.apache.hadoop.yarn.server.federation.store.records.DeleteReservationHomeSubClusterRequest; import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKey; import org.apache.hadoop.yarn.server.federation.store.records.RouterStoreToken; +import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyRequest; +import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyResponse; +import org.apache.hadoop.yarn.server.federation.store.sql.DatabaseProduct; +import org.apache.hadoop.yarn.server.federation.store.sql.FederationSQLOutParameter; +import org.apache.hadoop.yarn.server.federation.store.sql.FederationQueryRunner; +import org.apache.hadoop.yarn.server.federation.store.sql.RouterMasterKeyHandler; +import org.apache.hadoop.yarn.server.federation.store.sql.RouterStoreTokenHandler; import org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreUtils; -import org.junit.Assert; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,20 +53,30 @@ import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.DatabaseMetaData; import java.util.ArrayList; import java.util.List; +import static org.apache.hadoop.yarn.server.federation.store.impl.SQLFederationStateStore.CALL_SP_GET_MASTERKEY; import static org.apache.hadoop.yarn.server.federation.store.impl.SQLFederationStateStore.CALL_SP_ADD_RESERVATION_HOME_SUBCLUSTER; import static org.apache.hadoop.yarn.server.federation.store.impl.SQLFederationStateStore.CALL_SP_GET_RESERVATION_HOME_SUBCLUSTER; import static org.apache.hadoop.yarn.server.federation.store.impl.SQLFederationStateStore.CALL_SP_GET_RESERVATIONS_HOME_SUBCLUSTER; -import static org.apache.hadoop.yarn.server.federation.store.impl.SQLFederationStateStore.CALL_SP_UPDATE_RESERVATION_HOME_SUBCLUSTER; import static org.apache.hadoop.yarn.server.federation.store.impl.SQLFederationStateStore.CALL_SP_DELETE_RESERVATION_HOME_SUBCLUSTER; +import static org.apache.hadoop.yarn.server.federation.store.impl.SQLFederationStateStore.CALL_SP_GET_DELEGATIONTOKEN; +import static org.apache.hadoop.yarn.server.federation.store.impl.SQLFederationStateStore.CALL_SP_UPDATE_RESERVATION_HOME_SUBCLUSTER; +import static org.apache.hadoop.yarn.server.federation.store.sql.DatabaseProduct.DbType; import static org.apache.hadoop.yarn.server.federation.store.impl.HSQLDBFederationStateStore.SP_DROP_ADDRESERVATIONHOMESUBCLUSTER; import static org.apache.hadoop.yarn.server.federation.store.impl.HSQLDBFederationStateStore.SP_ADDRESERVATIONHOMESUBCLUSTER2; import static org.apache.hadoop.yarn.server.federation.store.impl.HSQLDBFederationStateStore.SP_DROP_UPDATERESERVATIONHOMESUBCLUSTER; import static org.apache.hadoop.yarn.server.federation.store.impl.HSQLDBFederationStateStore.SP_UPDATERESERVATIONHOMESUBCLUSTER2; import static org.apache.hadoop.yarn.server.federation.store.impl.HSQLDBFederationStateStore.SP_DROP_DELETERESERVATIONHOMESUBCLUSTER; import static org.apache.hadoop.yarn.server.federation.store.impl.HSQLDBFederationStateStore.SP_DELETERESERVATIONHOMESUBCLUSTER2; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static java.sql.Types.VARCHAR; +import static java.sql.Types.BIGINT; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; /** * Unit tests for SQLFederationStateStore. @@ -74,6 +89,7 @@ public class TestSQLFederationStateStore extends FederationStateStoreBaseTest { private static final String DATABASE_URL = "jdbc:hsqldb:mem:state"; private static final String DATABASE_USERNAME = "SA"; private static final String DATABASE_PASSWORD = ""; + private SQLFederationStateStore sqlFederationStateStore = null; @Override protected FederationStateStore createStateStore() { @@ -90,7 +106,8 @@ protected FederationStateStore createStateStore() { DATABASE_URL + System.currentTimeMillis()); conf.setInt(YarnConfiguration.FEDERATION_STATESTORE_MAX_APPLICATIONS, 10); super.setConf(conf); - return new HSQLDBFederationStateStore(); + sqlFederationStateStore = new HSQLDBFederationStateStore(); + return sqlFederationStateStore; } @Test @@ -103,13 +120,13 @@ public void testSqlConnectionsCreatedCount() throws YarnException { stateStore.registerSubCluster( SubClusterRegisterRequest.newInstance(subClusterInfo)); - Assert.assertEquals(subClusterInfo, querySubClusterInfo(subClusterId)); + assertEquals(subClusterInfo, querySubClusterInfo(subClusterId)); addApplicationHomeSC(appId, subClusterId); - Assert.assertEquals(subClusterId, queryApplicationHomeSC(appId)); + assertEquals(subClusterId, queryApplicationHomeSC(appId)); // Verify if connection is created only once at statestore init - Assert.assertEquals(1, + assertEquals(1, FederationStateStoreClientMetrics.getNumConnections()); } @@ -125,8 +142,7 @@ class ReservationHomeSC { } } - private ReservationHomeSC addReservationHomeSubCluster( - SQLFederationStateStore sqlFederationStateStore, String procedure, + private ReservationHomeSC addReservationHomeSubCluster(String procedure, String reservationId, String subHomeClusterId) throws SQLException, YarnException { // procedure call parameter preparation CallableStatement cstmt = sqlFederationStateStore.getCallableStatement(procedure); @@ -148,8 +164,7 @@ private ReservationHomeSC addReservationHomeSubCluster( return new ReservationHomeSC(reservationId, dbStoredHomeSubCluster, dbRowCount); } - private ReservationHomeSC getReservationHomeSubCluster( - SQLFederationStateStore sqlFederationStateStore, String procedure, + private ReservationHomeSC getReservationHomeSubCluster(String procedure, String reservationId) throws SQLException, YarnException { // procedure call parameter preparation @@ -170,8 +185,7 @@ private ReservationHomeSC getReservationHomeSubCluster( return new ReservationHomeSC(reservationId, dBSubClusterHomeId, 0); } - private List getReservationsHomeSubCluster( - SQLFederationStateStore sqlFederationStateStore, String procedure) + private List getReservationsHomeSubCluster(String procedure) throws SQLException, IOException, YarnException { List results = new ArrayList<>(); @@ -197,8 +211,7 @@ private List getReservationsHomeSubCluster( return results; } - private ReservationHomeSC updateReservationHomeSubCluster( - SQLFederationStateStore sqlFederationStateStore, String procedure, + private ReservationHomeSC updateReservationHomeSubCluster(String procedure, String reservationId, String subHomeClusterId) throws SQLException, IOException { @@ -222,8 +235,7 @@ private ReservationHomeSC updateReservationHomeSubCluster( return new ReservationHomeSC(reservationId, subHomeClusterId, rowCount); } - private ReservationHomeSC deleteReservationHomeSubCluster( - SQLFederationStateStore sqlFederationStateStore, String procedure, + private ReservationHomeSC deleteReservationHomeSubCluster(String procedure, String reservationId) throws SQLException { // procedure call parameter preparation CallableStatement cstmt = sqlFederationStateStore.getCallableStatement(procedure); @@ -254,21 +266,17 @@ private ReservationHomeSC deleteReservationHomeSubCluster( */ @Test public void testCheckAddReservationHomeSubCluster() throws Exception { - FederationStateStore stateStore = getStateStore(); - Assert.assertTrue(stateStore instanceof SQLFederationStateStore); - - SQLFederationStateStore sqlFederationStateStore = (SQLFederationStateStore) stateStore; // procedure call parameter preparation ReservationId reservationId = ReservationId.newInstance(Time.now(), 1); String subHomeClusterId = "SC-1"; - ReservationHomeSC resultHC = addReservationHomeSubCluster(sqlFederationStateStore, + ReservationHomeSC resultHC = addReservationHomeSubCluster( CALL_SP_ADD_RESERVATION_HOME_SUBCLUSTER, reservationId.toString(), subHomeClusterId); // validation results - Assert.assertNotNull(resultHC); - Assert.assertEquals(subHomeClusterId, resultHC.subHomeClusterId); - Assert.assertEquals(1, resultHC.dbUpdateCount); + assertNotNull(resultHC); + assertEquals(subHomeClusterId, resultHC.subHomeClusterId); + assertEquals(1, resultHC.dbUpdateCount); } /** @@ -282,24 +290,20 @@ public void testCheckAddReservationHomeSubCluster() throws Exception { */ @Test public void testCheckGetReservationHomeSubCluster() throws Exception { - FederationStateStore stateStore = getStateStore(); - Assert.assertTrue(stateStore instanceof SQLFederationStateStore); - - SQLFederationStateStore sqlFederationStateStore = (SQLFederationStateStore) stateStore; // procedure call parameter preparation ReservationId reservationId = ReservationId.newInstance(Time.now(), 1); String subHomeClusterId = "SC-1"; - addReservationHomeSubCluster(sqlFederationStateStore, + addReservationHomeSubCluster( CALL_SP_ADD_RESERVATION_HOME_SUBCLUSTER, reservationId.toString(), subHomeClusterId); // Call getReservationHomeSubCluster to get the result - ReservationHomeSC resultHC = getReservationHomeSubCluster(sqlFederationStateStore, + ReservationHomeSC resultHC = getReservationHomeSubCluster( CALL_SP_GET_RESERVATION_HOME_SUBCLUSTER, reservationId.toString()); - Assert.assertNotNull(resultHC); - Assert.assertEquals(subHomeClusterId, resultHC.subHomeClusterId); - Assert.assertEquals(reservationId.toString(), resultHC.reservationId); + assertNotNull(resultHC); + assertEquals(subHomeClusterId, resultHC.subHomeClusterId); + assertEquals(reservationId.toString(), resultHC.reservationId); } /** @@ -315,38 +319,34 @@ public void testCheckGetReservationHomeSubCluster() throws Exception { */ @Test public void testCheckGetReservationsHomeSubCluster() throws Exception { - FederationStateStore stateStore = getStateStore(); - Assert.assertTrue(stateStore instanceof SQLFederationStateStore); - - SQLFederationStateStore sqlFederationStateStore = (SQLFederationStateStore) stateStore; // add 1st record ReservationId reservationId1 = ReservationId.newInstance(Time.now(), 1); String subHomeClusterId1 = "SC-1"; - addReservationHomeSubCluster(sqlFederationStateStore, + addReservationHomeSubCluster( CALL_SP_ADD_RESERVATION_HOME_SUBCLUSTER, reservationId1.toString(), subHomeClusterId1); // add 2nd record ReservationId reservationId2 = ReservationId.newInstance(Time.now(), 2); String subHomeClusterId2 = "SC-2"; - addReservationHomeSubCluster(sqlFederationStateStore, + addReservationHomeSubCluster( CALL_SP_ADD_RESERVATION_HOME_SUBCLUSTER, reservationId2.toString(), subHomeClusterId2); List reservationHomeSubClusters = getReservationsHomeSubCluster( - sqlFederationStateStore, CALL_SP_GET_RESERVATIONS_HOME_SUBCLUSTER); + CALL_SP_GET_RESERVATIONS_HOME_SUBCLUSTER); - Assert.assertNotNull(reservationHomeSubClusters); - Assert.assertEquals(2, reservationHomeSubClusters.size()); + assertNotNull(reservationHomeSubClusters); + assertEquals(2, reservationHomeSubClusters.size()); ReservationHomeSC resultHC1 = reservationHomeSubClusters.get(0); - Assert.assertNotNull(resultHC1); - Assert.assertEquals(reservationId1.toString(), resultHC1.reservationId); - Assert.assertEquals(subHomeClusterId1, resultHC1.subHomeClusterId); + assertNotNull(resultHC1); + assertEquals(reservationId1.toString(), resultHC1.reservationId); + assertEquals(subHomeClusterId1, resultHC1.subHomeClusterId); ReservationHomeSC resultHC2 = reservationHomeSubClusters.get(1); - Assert.assertNotNull(resultHC2); - Assert.assertEquals(reservationId2.toString(), resultHC2.reservationId); - Assert.assertEquals(subHomeClusterId2, resultHC2.subHomeClusterId); + assertNotNull(resultHC2); + assertEquals(reservationId2.toString(), resultHC2.reservationId); + assertEquals(subHomeClusterId2, resultHC2.subHomeClusterId); } /** @@ -364,37 +364,33 @@ public void testCheckGetReservationsHomeSubCluster() throws Exception { */ @Test public void testCheckUpdateReservationHomeSubCluster() throws Exception { - FederationStateStore stateStore = getStateStore(); - Assert.assertTrue(stateStore instanceof SQLFederationStateStore); - - SQLFederationStateStore sqlFederationStateStore = (SQLFederationStateStore) stateStore; // procedure call parameter preparation ReservationId reservationId = ReservationId.newInstance(Time.now(), 1); String subHomeClusterId = "SC-1"; - addReservationHomeSubCluster(sqlFederationStateStore, + addReservationHomeSubCluster( CALL_SP_ADD_RESERVATION_HOME_SUBCLUSTER, reservationId.toString(), subHomeClusterId); // verify that the subHomeClusterId corresponding to reservationId is SC-1 - ReservationHomeSC resultHC = getReservationHomeSubCluster(sqlFederationStateStore, + ReservationHomeSC resultHC = getReservationHomeSubCluster( CALL_SP_GET_RESERVATION_HOME_SUBCLUSTER, reservationId.toString()); - Assert.assertNotNull(resultHC); - Assert.assertEquals(subHomeClusterId, resultHC.subHomeClusterId); + assertNotNull(resultHC); + assertEquals(subHomeClusterId, resultHC.subHomeClusterId); // prepare to update parameters String newSubHomeClusterId = "SC-2"; ReservationHomeSC reservationHomeSubCluster = - updateReservationHomeSubCluster(sqlFederationStateStore, + updateReservationHomeSubCluster( CALL_SP_UPDATE_RESERVATION_HOME_SUBCLUSTER, reservationId.toString(), newSubHomeClusterId); - Assert.assertNotNull(reservationHomeSubCluster); - Assert.assertEquals(1, reservationHomeSubCluster.dbUpdateCount); + assertNotNull(reservationHomeSubCluster); + assertEquals(1, reservationHomeSubCluster.dbUpdateCount); // verify that the subHomeClusterId corresponding to reservationId is SC-2 - ReservationHomeSC resultHC2 = getReservationHomeSubCluster(sqlFederationStateStore, + ReservationHomeSC resultHC2 = getReservationHomeSubCluster( CALL_SP_GET_RESERVATION_HOME_SUBCLUSTER, reservationId.toString()); - Assert.assertNotNull(resultHC2); - Assert.assertEquals(newSubHomeClusterId, resultHC2.subHomeClusterId); + assertNotNull(resultHC2); + assertEquals(newSubHomeClusterId, resultHC2.subHomeClusterId); } /** @@ -411,29 +407,25 @@ public void testCheckUpdateReservationHomeSubCluster() throws Exception { */ @Test public void testCheckDeleteReservationHomeSubCluster() throws Exception { - FederationStateStore stateStore = getStateStore(); - Assert.assertTrue(stateStore instanceof SQLFederationStateStore); - - SQLFederationStateStore sqlFederationStateStore = (SQLFederationStateStore) stateStore; // procedure call parameter preparation ReservationId reservationId = ReservationId.newInstance(Time.now(), 1); String subHomeClusterId = "SC-1"; - addReservationHomeSubCluster(sqlFederationStateStore, + addReservationHomeSubCluster( CALL_SP_ADD_RESERVATION_HOME_SUBCLUSTER, reservationId.toString(), subHomeClusterId); // call the delete method of the reservation - ReservationHomeSC resultHC = deleteReservationHomeSubCluster(sqlFederationStateStore, + ReservationHomeSC resultHC = deleteReservationHomeSubCluster( CALL_SP_DELETE_RESERVATION_HOME_SUBCLUSTER, reservationId.toString()); - Assert.assertNotNull(resultHC); - Assert.assertEquals(1, resultHC.dbUpdateCount); + assertNotNull(resultHC); + assertEquals(1, resultHC.dbUpdateCount); // call getReservationHomeSubCluster to get the result - ReservationHomeSC resultHC1 = getReservationHomeSubCluster(sqlFederationStateStore, + ReservationHomeSC resultHC1 = getReservationHomeSubCluster( CALL_SP_GET_RESERVATION_HOME_SUBCLUSTER, reservationId.toString()); - Assert.assertNotNull(resultHC1); - Assert.assertEquals(null, resultHC1.subHomeClusterId); + assertNotNull(resultHC1); + assertEquals(null, resultHC1.subHomeClusterId); } /** @@ -446,10 +438,6 @@ public void testCheckDeleteReservationHomeSubCluster() throws Exception { */ @Test public void testAddReservationHomeSubClusterAbnormalSituation() throws Exception { - FederationStateStore stateStore = getStateStore(); - Assert.assertTrue(stateStore instanceof SQLFederationStateStore); - - SQLFederationStateStore sqlFederationStateStore = (SQLFederationStateStore) stateStore; Connection conn = sqlFederationStateStore.getConn(); conn.prepareStatement(SP_DROP_ADDRESERVATIONHOMESUBCLUSTER).execute(); @@ -470,7 +458,7 @@ public void testAddReservationHomeSubClusterAbnormalSituation() throws Exception "please check the records of the database.", subClusterId, reservationId); LambdaTestUtils.intercept(YarnException.class, errorMsg, - () -> stateStore.addReservationHomeSubCluster(request)); + () -> sqlFederationStateStore.addReservationHomeSubCluster(request)); } /** @@ -483,10 +471,6 @@ public void testAddReservationHomeSubClusterAbnormalSituation() throws Exception */ @Test public void testUpdateReservationHomeSubClusterAbnormalSituation() throws Exception { - FederationStateStore stateStore = getStateStore(); - Assert.assertTrue(stateStore instanceof SQLFederationStateStore); - - SQLFederationStateStore sqlFederationStateStore = (SQLFederationStateStore) stateStore; Connection conn = sqlFederationStateStore.getConn(); conn.prepareStatement(SP_DROP_UPDATERESERVATIONHOMESUBCLUSTER).execute(); @@ -500,7 +484,7 @@ public void testUpdateReservationHomeSubClusterAbnormalSituation() throws Except ReservationHomeSubCluster.newInstance(reservationId, subClusterId1); AddReservationHomeSubClusterRequest addRequest = AddReservationHomeSubClusterRequest.newInstance(reservationHomeSubCluster); - stateStore.addReservationHomeSubCluster(addRequest); + sqlFederationStateStore.addReservationHomeSubCluster(addRequest); SubClusterId subClusterId2 = SubClusterId.newInstance("SC2"); ReservationHomeSubCluster reservationHomeSubCluster2 = @@ -516,7 +500,7 @@ public void testUpdateReservationHomeSubClusterAbnormalSituation() throws Except subClusterId2, reservationId); LambdaTestUtils.intercept(YarnException.class, errorMsg, - () -> stateStore.updateReservationHomeSubCluster(updateRequest)); + () -> sqlFederationStateStore.updateReservationHomeSubCluster(updateRequest)); } /** @@ -529,10 +513,6 @@ public void testUpdateReservationHomeSubClusterAbnormalSituation() throws Except */ @Test public void testDeleteReservationHomeSubClusterAbnormalSituation() throws Exception { - FederationStateStore stateStore = getStateStore(); - Assert.assertTrue(stateStore instanceof SQLFederationStateStore); - - SQLFederationStateStore sqlFederationStateStore = (SQLFederationStateStore) stateStore; Connection conn = sqlFederationStateStore.getConn(); conn.prepareStatement(SP_DROP_DELETERESERVATIONHOMESUBCLUSTER).execute(); @@ -546,7 +526,7 @@ public void testDeleteReservationHomeSubClusterAbnormalSituation() throws Except ReservationHomeSubCluster.newInstance(reservationId, subClusterId1); AddReservationHomeSubClusterRequest addRequest = AddReservationHomeSubClusterRequest.newInstance(reservationHomeSubCluster); - stateStore.addReservationHomeSubCluster(addRequest); + sqlFederationStateStore.addReservationHomeSubCluster(addRequest); DeleteReservationHomeSubClusterRequest delRequest = DeleteReservationHomeSubClusterRequest.newInstance(reservationId); @@ -559,55 +539,92 @@ public void testDeleteReservationHomeSubClusterAbnormalSituation() throws Except reservationId); LambdaTestUtils.intercept(YarnException.class, errorMsg, - () -> stateStore.deleteReservationHomeSubCluster(delRequest)); - } - - @Test(expected = NotImplementedException.class) - public void testStoreNewMasterKey() throws Exception { - super.testStoreNewMasterKey(); - } - - @Test(expected = NotImplementedException.class) - public void testGetMasterKeyByDelegationKey() throws YarnException, IOException { - super.testGetMasterKeyByDelegationKey(); - } - - @Test(expected = NotImplementedException.class) - public void testRemoveStoredMasterKey() throws YarnException, IOException { - super.testRemoveStoredMasterKey(); - } - - @Test(expected = NotImplementedException.class) - public void testStoreNewToken() throws IOException, YarnException { - super.testStoreNewToken(); - } - - @Test(expected = NotImplementedException.class) - public void testUpdateStoredToken() throws IOException, YarnException { - super.testUpdateStoredToken(); - } - - @Test(expected = NotImplementedException.class) - public void testRemoveStoredToken() throws IOException, YarnException { - super.testRemoveStoredToken(); - } - - @Test(expected = NotImplementedException.class) - public void testGetTokenByRouterStoreToken() throws IOException, YarnException { - super.testGetTokenByRouterStoreToken(); + () -> sqlFederationStateStore.deleteReservationHomeSubCluster(delRequest)); } @Override protected void checkRouterMasterKey(DelegationKey delegationKey, - RouterMasterKey routerMasterKey) throws YarnException, IOException { - // TODO: This part of the code will be completed in YARN-11349 and - // will be used to verify whether the RouterMasterKey stored in the DB is as expected. + RouterMasterKey routerMasterKey) throws YarnException, IOException, SQLException { + // Check for MasterKey stored in DB. + RouterMasterKeyRequest routerMasterKeyRequest = + RouterMasterKeyRequest.newInstance(routerMasterKey); + + // Query Data from DB. + Connection conn = sqlFederationStateStore.getConn(); + int paramKeyId = delegationKey.getKeyId(); + FederationQueryRunner runner = new FederationQueryRunner(); + FederationSQLOutParameter masterKeyOUT = + new FederationSQLOutParameter<>("masterKey_OUT", VARCHAR, String.class); + RouterMasterKey sqlRouterMasterKey = runner.execute( + conn, CALL_SP_GET_MASTERKEY, new RouterMasterKeyHandler(), paramKeyId, masterKeyOUT); + + // Check Data. + RouterMasterKeyResponse response = getStateStore(). + getMasterKeyByDelegationKey(routerMasterKeyRequest); + assertNotNull(response); + RouterMasterKey respRouterMasterKey = response.getRouterMasterKey(); + assertEquals(routerMasterKey, respRouterMasterKey); + assertEquals(routerMasterKey, sqlRouterMasterKey); + assertEquals(sqlRouterMasterKey, respRouterMasterKey); } @Override protected void checkRouterStoreToken(RMDelegationTokenIdentifier identifier, - RouterStoreToken token) throws YarnException, IOException { - // TODO: This part of the code will be completed in YARN-11349 and - // will be used to verify whether the RouterStoreToken stored in the DB is as expected. + RouterStoreToken token) throws YarnException, IOException, SQLException { + // Get SequenceNum. + int sequenceNum = identifier.getSequenceNumber(); + + // Query Data from DB. + Connection conn = sqlFederationStateStore.getConn(); + FederationQueryRunner runner = new FederationQueryRunner(); + FederationSQLOutParameter tokenIdentOUT = + new FederationSQLOutParameter<>("tokenIdent_OUT", VARCHAR, String.class); + FederationSQLOutParameter tokenOUT = + new FederationSQLOutParameter<>("token_OUT", VARCHAR, String.class); + FederationSQLOutParameter renewDateOUT = + new FederationSQLOutParameter<>("renewDate_OUT", BIGINT, Long.class); + RouterStoreToken sqlRouterStoreToken = runner.execute(conn, CALL_SP_GET_DELEGATIONTOKEN, + new RouterStoreTokenHandler(), sequenceNum, tokenIdentOUT, tokenOUT, renewDateOUT); + + assertEquals(token, sqlRouterStoreToken); + } + + @Test + public void testCheckHSQLDB() throws SQLException { + Connection conn = sqlFederationStateStore.getConn(); + DbType dbType = DatabaseProduct.getDbType(conn); + assertEquals(DbType.HSQLDB, dbType); + } + + @Test + public void testGetDbTypeNullConn() throws SQLException { + DbType dbType = DatabaseProduct.getDbType(null); + assertEquals(DbType.UNDEFINED, dbType); + } + + @Test + public void testGetDBTypeEmptyConn() throws SQLException { + Connection connection = mock(Connection.class); + DatabaseMetaData metaData = mock(DatabaseMetaData.class); + when(metaData.getDatabaseProductName()).thenReturn(""); + when(connection.getMetaData()).thenReturn(metaData); + DbType dbType = DatabaseProduct.getDbType(connection); + assertEquals(DbType.UNDEFINED, dbType); + } + + @Test + public void testCheckForHSQLDBUpdateSQL() throws SQLException { + String sql = "select sequenceName, nextVal from sequenceTable"; + String hsqlDBSQL = DatabaseProduct.addForUpdateClause(DbType.HSQLDB, sql); + String expectUpdateSQL = "select sequenceName, nextVal from sequenceTable for update"; + assertEquals(expectUpdateSQL, hsqlDBSQL); + } + + @Test + public void testCheckForSqlServerDBUpdateSQL() throws SQLException { + String sql = "select sequenceName, nextVal from sequenceTable"; + String sqlServerDBSQL = DatabaseProduct.addForUpdateClause(DbType.SQLSERVER, sql); + String expectUpdateSQL = "select sequenceName, nextVal from sequenceTable with (updlock)"; + assertEquals(expectUpdateSQL, sqlServerDBSQL); } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/security/token/delegation/RouterDelegationTokenSupport.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/security/token/delegation/RouterDelegationTokenSupport.java new file mode 100644 index 0000000000..d530f751cb --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/security/token/delegation/RouterDelegationTokenSupport.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.security.token.delegation; + +import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.Base64; + +/** + * Workaround for serialization of {@link DelegationTokenInformation} through package access. + * Future version of Hadoop should add this to DelegationTokenInformation itself. + */ +public final class RouterDelegationTokenSupport { + + private RouterDelegationTokenSupport() { + } + + public static String encodeDelegationTokenInformation(DelegationTokenInformation token) { + try { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DataOutputStream out = new DataOutputStream(bos); + WritableUtils.writeVInt(out, token.password.length); + out.write(token.password); + out.writeLong(token.renewDate); + out.flush(); + byte[] tokenInfoBytes = bos.toByteArray(); + return Base64.getUrlEncoder().encodeToString(tokenInfoBytes); + } catch (IOException ex) { + throw new RuntimeException("Failed to encode token.", ex); + } + } + + public static DelegationTokenInformation decodeDelegationTokenInformation(byte[] tokenBytes) + throws IOException { + DataInputStream in = new DataInputStream(new ByteArrayInputStream(tokenBytes)); + DelegationTokenInformation token = new DelegationTokenInformation(0, null); + int len = WritableUtils.readVInt(in); + token.password = new byte[len]; + in.readFully(token.password); + token.renewDate = in.readLong(); + return token; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/security/token/delegation/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/security/token/delegation/package-info.java new file mode 100644 index 0000000000..3a1cb3e69a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/security/token/delegation/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** Router security token delegation. **/ +package org.apache.hadoop.security.token.delegation; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/security/RouterDelegationTokenSecretManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/security/RouterDelegationTokenSecretManager.java index 918bf16e4f..57d2aaa4bf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/security/RouterDelegationTokenSecretManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/security/RouterDelegationTokenSecretManager.java @@ -21,13 +21,16 @@ import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager; import org.apache.hadoop.security.token.delegation.DelegationKey; +import org.apache.hadoop.security.token.delegation.RouterDelegationTokenSupport; import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.security.client.YARNDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKey; import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyResponse; import org.apache.hadoop.yarn.server.federation.store.records.RouterRMTokenResponse; +import org.apache.hadoop.yarn.server.federation.store.records.RouterStoreToken; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,6 +41,7 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.Base64; /** * A Router specific delegation token secret manager. @@ -137,6 +141,29 @@ public void storeNewToken(RMDelegationTokenIdentifier identifier, } } + /** + * The Router Supports Store new Token. + * + * @param identifier RMDelegationToken. + * @param tokenInfo DelegationTokenInformation. + */ + public void storeNewToken(RMDelegationTokenIdentifier identifier, + DelegationTokenInformation tokenInfo) { + try { + String token = + RouterDelegationTokenSupport.encodeDelegationTokenInformation(tokenInfo); + long renewDate = tokenInfo.getRenewDate(); + + federationFacade.storeNewToken(identifier, renewDate, token); + } catch (Exception e) { + if (!shouldIgnoreException(e)) { + LOG.error("Error in storing RMDelegationToken with sequence number: {}.", + identifier.getSequenceNumber()); + ExitUtil.terminate(1, e); + } + } + } + /** * The Router Supports Update Token. * @@ -157,6 +184,27 @@ public void updateStoredToken(RMDelegationTokenIdentifier id, long renewDate) th } } + /** + * The Router Supports Update Token. + * + * @param identifier RMDelegationToken. + * @param tokenInfo DelegationTokenInformation. + */ + public void updateStoredToken(RMDelegationTokenIdentifier identifier, + DelegationTokenInformation tokenInfo) { + try { + long renewDate = tokenInfo.getRenewDate(); + String token = RouterDelegationTokenSupport.encodeDelegationTokenInformation(tokenInfo); + federationFacade.updateStoredToken(identifier, renewDate, token); + } catch (Exception e) { + if (!shouldIgnoreException(e)) { + LOG.error("Error in updating persisted RMDelegationToken with sequence number: {}.", + identifier.getSequenceNumber()); + ExitUtil.terminate(1, e); + } + } + } + /** * The Router Supports Remove Token. * @@ -267,6 +315,42 @@ protected synchronized int incrementDelegationTokenSeqNum() { return federationFacade.incrementDelegationTokenSeqNum(); } + @Override + protected void storeToken(RMDelegationTokenIdentifier rmDelegationTokenIdentifier, + DelegationTokenInformation tokenInfo) throws IOException { + this.currentTokens.put(rmDelegationTokenIdentifier, tokenInfo); + this.addTokenForOwnerStats(rmDelegationTokenIdentifier); + storeNewToken(rmDelegationTokenIdentifier, tokenInfo); + } + + @Override + protected void updateToken(RMDelegationTokenIdentifier rmDelegationTokenIdentifier, + DelegationTokenInformation tokenInfo) throws IOException { + this.currentTokens.put(rmDelegationTokenIdentifier, tokenInfo); + updateStoredToken(rmDelegationTokenIdentifier, tokenInfo); + } + + @Override + protected DelegationTokenInformation getTokenInfo( + RMDelegationTokenIdentifier ident) { + // First check if I have this.. + DelegationTokenInformation tokenInfo = currentTokens.get(ident); + if (tokenInfo == null) { + try { + RouterRMTokenResponse response = federationFacade.getTokenByRouterStoreToken(ident); + RouterStoreToken routerStoreToken = response.getRouterStoreToken(); + String tokenStr = routerStoreToken.getTokenInfo(); + byte[] tokenBytes = Base64.getUrlDecoder().decode(tokenStr); + tokenInfo = RouterDelegationTokenSupport.decodeDelegationTokenInformation(tokenBytes); + } catch (Exception e) { + LOG.error("Error retrieving tokenInfo [" + ident.getSequenceNumber() + + "] from StateStore.", e); + throw new YarnRuntimeException(e); + } + } + return tokenInfo; + } + @Override protected synchronized int getDelegationTokenSeqNum() { return federationFacade.getDelegationTokenSeqNum(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java index 2488fc73b0..6f7248a086 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java @@ -138,6 +138,7 @@ import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager; import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore; import org.apache.hadoop.yarn.server.federation.store.records.RouterRMDTSecretManagerState; +import org.apache.hadoop.yarn.server.federation.store.records.RouterStoreToken; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; @@ -1617,14 +1618,17 @@ public void testGetDelegationToken() throws IOException, YarnException { RouterRMDTSecretManagerState managerState = stateStore.getRouterRMSecretManagerState(); Assert.assertNotNull(managerState); - Map delegationTokenState = managerState.getTokenState(); + Map delegationTokenState = + managerState.getTokenState(); Assert.assertNotNull(delegationTokenState); Assert.assertTrue(delegationTokenState.containsKey(rMDelegationTokenIdentifier)); long tokenRenewInterval = this.getConf().getLong( YarnConfiguration.RM_DELEGATION_TOKEN_RENEW_INTERVAL_KEY, YarnConfiguration.RM_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT); - long renewDate = delegationTokenState.get(rMDelegationTokenIdentifier); + RouterStoreToken resultRouterStoreToken = delegationTokenState.get(rMDelegationTokenIdentifier); + Assert.assertNotNull(resultRouterStoreToken); + long renewDate = resultRouterStoreToken.getRenewDate(); Assert.assertEquals(issueDate + tokenRenewInterval, renewDate); } @@ -1667,10 +1671,13 @@ public void testRenewDelegationToken() throws IOException, YarnException { // Step3. Compare whether the expirationTime returned to // the client is consistent with the renewDate in the stateStore RouterRMDTSecretManagerState managerState = stateStore.getRouterRMSecretManagerState(); - Map delegationTokenState = managerState.getTokenState(); + Map delegationTokenState = + managerState.getTokenState(); Assert.assertNotNull(delegationTokenState); Assert.assertTrue(delegationTokenState.containsKey(rMDelegationTokenIdentifier)); - long renewDate = delegationTokenState.get(rMDelegationTokenIdentifier); + RouterStoreToken resultRouterStoreToken = delegationTokenState.get(rMDelegationTokenIdentifier); + Assert.assertNotNull(resultRouterStoreToken); + long renewDate = resultRouterStoreToken.getRenewDate(); Assert.assertEquals(expDate, renewDate); } @@ -1700,7 +1707,8 @@ public void testCancelDelegationToken() throws IOException, YarnException { // Step3. Query the data in the StateStore and confirm that the Delegation has been deleted. // At this point, the size of delegationTokenState should be 0. RouterRMDTSecretManagerState managerState = stateStore.getRouterRMSecretManagerState(); - Map delegationTokenState = managerState.getTokenState(); + Map delegationTokenState = + managerState.getTokenState(); Assert.assertNotNull(delegationTokenState); Assert.assertEquals(0, delegationTokenState.size()); }