From fb31393b65b7a877ec256678e81a0f852b3cf14d Mon Sep 17 00:00:00 2001 From: hchaverr Date: Tue, 24 Jan 2023 10:43:36 -0800 Subject: [PATCH] HADOOP-18535. Implement token storage solution based on MySQL Fixes #1240 Signed-off-by: Owen O'Malley --- .../AbstractDelegationTokenSecretManager.java | 36 +- .../SQLDelegationTokenSecretManager.java | 400 +++++++++++++++ hadoop-hdfs-project/hadoop-hdfs-rbf/pom.xml | 22 + .../TokenStore/MySQL/TokenStoreDatabase.sql | 21 + .../TokenStore/MySQL/TokenStoreTables.sql | 52 ++ .../TokenStore/MySQL/TokenStoreUser.sql | 26 + .../hadoop-hdfs-rbf/scripts/TokenStore/README | 24 + .../security/token/DistributedSQLCounter.java | 138 +++++ .../HikariDataSourceConnectionFactory.java | 68 +++ .../security/token/SQLConnectionFactory.java | 54 ++ .../SQLDelegationTokenSecretManagerImpl.java | 242 +++++++++ .../SQLSecretManagerRetriableHandler.java | 133 +++++ ...stSQLDelegationTokenSecretManagerImpl.java | 471 ++++++++++++++++++ hadoop-project/pom.xml | 12 + 14 files changed, 1697 insertions(+), 2 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/SQLDelegationTokenSecretManager.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs-rbf/scripts/TokenStore/MySQL/TokenStoreDatabase.sql create mode 100644 hadoop-hdfs-project/hadoop-hdfs-rbf/scripts/TokenStore/MySQL/TokenStoreTables.sql create mode 100644 hadoop-hdfs-project/hadoop-hdfs-rbf/scripts/TokenStore/MySQL/TokenStoreUser.sql create mode 100644 hadoop-hdfs-project/hadoop-hdfs-rbf/scripts/TokenStore/README create mode 100644 hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/DistributedSQLCounter.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/HikariDataSourceConnectionFactory.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/SQLConnectionFactory.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/SQLDelegationTokenSecretManagerImpl.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/SQLSecretManagerRetriableHandler.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/security/token/TestSQLDelegationTokenSecretManagerImpl.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java index 8aaf9bbd8d..cde4cf4841 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java @@ -19,7 +19,9 @@ package org.apache.hadoop.security.token.delegation; import java.io.ByteArrayInputStream; +import java.io.DataInput; import java.io.DataInputStream; +import java.io.DataOutput; import java.io.IOException; import java.security.MessageDigest; import java.util.ArrayList; @@ -41,6 +43,8 @@ import org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding; import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.metrics2.annotation.Metric; import org.apache.hadoop.metrics2.annotation.Metrics; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; @@ -441,8 +445,9 @@ private void updateCurrentKey() throws IOException { /** * Update the current master key for generating delegation tokens * It should be called only by tokenRemoverThread. + * @throws IOException raised on errors performing I/O. */ - void rollMasterKey() throws IOException { + protected void rollMasterKey() throws IOException { synchronized (this) { removeExpiredKeys(); /* set final expiry date for retiring currentKey */ @@ -677,11 +682,15 @@ public static SecretKey createSecretKey(byte[] key) { /** Class to encapsulate a token's renew date and password. */ @InterfaceStability.Evolving - public static class DelegationTokenInformation { + public static class DelegationTokenInformation implements Writable { long renewDate; byte[] password; String trackingId; + public DelegationTokenInformation() { + this(0, null); + } + public DelegationTokenInformation(long renewDate, byte[] password) { this(renewDate, password, null); } @@ -711,6 +720,29 @@ byte[] getPassword() { public String getTrackingId() { return trackingId; } + + @Override + public void write(DataOutput out) throws IOException { + WritableUtils.writeVLong(out, renewDate); + if (password == null) { + WritableUtils.writeVInt(out, -1); + } else { + WritableUtils.writeVInt(out, password.length); + out.write(password); + } + WritableUtils.writeString(out, trackingId); + } + + @Override + public void readFields(DataInput in) throws IOException { + renewDate = WritableUtils.readVLong(in); + int len = WritableUtils.readVInt(in); + if (len > -1) { + password = new byte[len]; + in.readFully(password); + } + trackingId = WritableUtils.readString(in); + } } /** Remove expired delegation tokens from cache */ diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/SQLDelegationTokenSecretManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/SQLDelegationTokenSecretManager.java new file mode 100644 index 0000000000..4b6ae21d7a --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/SQLDelegationTokenSecretManager.java @@ -0,0 +1,400 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.security.token.delegation; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.sql.SQLException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.delegation.web.DelegationTokenManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * An implementation of {@link AbstractDelegationTokenSecretManager} that + * persists TokenIdentifiers and DelegationKeys in an existing SQL database. + */ +public abstract class SQLDelegationTokenSecretManager + extends AbstractDelegationTokenSecretManager { + + private static final Logger LOG = + LoggerFactory.getLogger(SQLDelegationTokenSecretManager.class); + + public static final String SQL_DTSM_CONF_PREFIX = "sql-dt-secret-manager."; + private static final String SQL_DTSM_TOKEN_SEQNUM_BATCH_SIZE = SQL_DTSM_CONF_PREFIX + + "token.seqnum.batch.size"; + public static final int DEFAULT_SEQ_NUM_BATCH_SIZE = 10; + + // Batch of sequence numbers that will be requested by the sequenceNumCounter. + // A new batch is requested once the sequenceNums available to a secret manager are + // exhausted, including during initialization. + private final int seqNumBatchSize; + + // Last sequenceNum in the current batch that has been allocated to a token. + private int currentSeqNum; + + // Max sequenceNum in the current batch that can be allocated to a token. + // Unused sequenceNums in the current batch cannot be reused by other routers. + private int currentMaxSeqNum; + + public SQLDelegationTokenSecretManager(Configuration conf) { + super(conf.getLong(DelegationTokenManager.UPDATE_INTERVAL, + DelegationTokenManager.UPDATE_INTERVAL_DEFAULT) * 1000, + conf.getLong(DelegationTokenManager.MAX_LIFETIME, + DelegationTokenManager.MAX_LIFETIME_DEFAULT) * 1000, + conf.getLong(DelegationTokenManager.RENEW_INTERVAL, + DelegationTokenManager.RENEW_INTERVAL_DEFAULT) * 1000, + conf.getLong(DelegationTokenManager.REMOVAL_SCAN_INTERVAL, + DelegationTokenManager.REMOVAL_SCAN_INTERVAL_DEFAULT) * 1000); + + this.seqNumBatchSize = conf.getInt(SQL_DTSM_TOKEN_SEQNUM_BATCH_SIZE, + DEFAULT_SEQ_NUM_BATCH_SIZE); + } + + /** + * Persists a TokenIdentifier and its corresponding TokenInformation into + * the SQL database. The TokenIdentifier is expected to be unique and any + * duplicate token attempts will result in an IOException. + * @param ident TokenIdentifier to persist. + * @param tokenInfo DelegationTokenInformation associated with the TokenIdentifier. + */ + @Override + protected void storeToken(TokenIdent ident, + DelegationTokenInformation tokenInfo) throws IOException { + try (ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(bos)) { + tokenInfo.write(dos); + // Add token to SQL database + insertToken(ident.getSequenceNumber(), ident.getBytes(), bos.toByteArray()); + // Add token to local cache + super.storeToken(ident, tokenInfo); + } catch (SQLException e) { + throw new IOException("Failed to store token in SQL secret manager", e); + } + } + + /** + * Updates the TokenInformation of an existing TokenIdentifier in + * the SQL database. + * @param ident Existing TokenIdentifier in the SQL database. + * @param tokenInfo Updated DelegationTokenInformation associated with the TokenIdentifier. + */ + @Override + protected void updateToken(TokenIdent ident, + DelegationTokenInformation tokenInfo) throws IOException { + try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) { + try (DataOutputStream dos = new DataOutputStream(bos)) { + tokenInfo.write(dos); + // Update token in SQL database + updateToken(ident.getSequenceNumber(), ident.getBytes(), bos.toByteArray()); + // Update token in local cache + super.updateToken(ident, tokenInfo); + } + } catch (SQLException e) { + throw new IOException("Failed to update token in SQL secret manager", e); + } + } + + /** + * Cancels a token by removing it from the SQL database. This will + * call the corresponding method in {@link AbstractDelegationTokenSecretManager} + * to perform validation and remove the token from the cache. + * @return Identifier of the canceled token + */ + @Override + public synchronized TokenIdent cancelToken(Token token, + String canceller) throws IOException { + try (ByteArrayInputStream bis = new ByteArrayInputStream(token.getIdentifier()); + DataInputStream din = new DataInputStream(bis)) { + TokenIdent id = createIdentifier(); + id.readFields(din); + + // Calling getTokenInfo to load token into local cache if not present. + // super.cancelToken() requires token to be present in local cache. + getTokenInfo(id); + } + + return super.cancelToken(token, canceller); + } + + /** + * Removes the existing TokenInformation from the SQL database to + * invalidate it. + * @param ident TokenInformation to remove from the SQL database. + */ + @Override + protected void removeStoredToken(TokenIdent ident) throws IOException { + try { + deleteToken(ident.getSequenceNumber(), ident.getBytes()); + } catch (SQLException e) { + LOG.warn("Failed to remove token in SQL secret manager", e); + } + } + + /** + * Obtains the DelegationTokenInformation associated with the given + * TokenIdentifier in the SQL database. + * @param ident Existing TokenIdentifier in the SQL database. + * @return DelegationTokenInformation that matches the given TokenIdentifier or + * null if it doesn't exist in the database. + */ + @Override + protected DelegationTokenInformation getTokenInfo(TokenIdent ident) { + // Look for token in local cache + DelegationTokenInformation tokenInfo = super.getTokenInfo(ident); + + if (tokenInfo == null) { + try { + // Look for token in SQL database + byte[] tokenInfoBytes = selectTokenInfo(ident.getSequenceNumber(), ident.getBytes()); + + if (tokenInfoBytes != null) { + tokenInfo = new DelegationTokenInformation(); + try (ByteArrayInputStream bis = new ByteArrayInputStream(tokenInfoBytes)) { + try (DataInputStream dis = new DataInputStream(bis)) { + tokenInfo.readFields(dis); + } + } + + // Update token in local cache + currentTokens.put(ident, tokenInfo); + } + } catch (IOException | SQLException e) { + LOG.error("Failed to get token in SQL secret manager", e); + } + } + + return tokenInfo; + } + + /** + * Obtains the value of the last reserved sequence number. + * @return Last reserved sequence number. + */ + @Override + public int getDelegationTokenSeqNum() { + try { + return selectSequenceNum(); + } catch (SQLException e) { + throw new RuntimeException( + "Failed to get token sequence number in SQL secret manager", e); + } + } + + /** + * Updates the value of the last reserved sequence number. + * @param seqNum Value to update the sequence number to. + */ + @Override + public void setDelegationTokenSeqNum(int seqNum) { + try { + updateSequenceNum(seqNum); + } catch (SQLException e) { + throw new RuntimeException( + "Failed to update token sequence number in SQL secret manager", e); + } + } + + /** + * Obtains the next available sequence number that can be allocated to a Token. + * Sequence numbers need to be reserved using the shared sequenceNumberCounter once + * the local batch has been exhausted, which handles sequenceNumber allocation + * concurrently with other secret managers. + * This method ensures that sequence numbers are incremental in a single secret manager, + * but not across secret managers. + * @return Next available sequence number. + */ + @Override + public synchronized int incrementDelegationTokenSeqNum() { + if (currentSeqNum >= currentMaxSeqNum) { + try { + // Request a new batch of sequence numbers and use the + // lowest one available. + currentSeqNum = incrementSequenceNum(seqNumBatchSize); + currentMaxSeqNum = currentSeqNum + seqNumBatchSize; + } catch (SQLException e) { + throw new RuntimeException( + "Failed to increment token sequence number in SQL secret manager", e); + } + } + + return ++currentSeqNum; + } + + /** + * Persists a DelegationKey into the SQL database. The delegation keyId + * is expected to be unique and any duplicate key attempts will result + * in an IOException. + * @param key DelegationKey to persist into the SQL database. + */ + @Override + protected void storeDelegationKey(DelegationKey key) throws IOException { + try (ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(bos)) { + key.write(dos); + // Add delegation key to SQL database + insertDelegationKey(key.getKeyId(), bos.toByteArray()); + // Add delegation key to local cache + super.storeDelegationKey(key); + } catch (SQLException e) { + throw new IOException("Failed to store delegation key in SQL secret manager", e); + } + } + + /** + * Updates an existing DelegationKey in the SQL database. + * @param key Updated DelegationKey. + */ + @Override + protected void updateDelegationKey(DelegationKey key) throws IOException { + try (ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(bos)) { + key.write(dos); + // Update delegation key in SQL database + updateDelegationKey(key.getKeyId(), bos.toByteArray()); + // Update delegation key in local cache + super.updateDelegationKey(key); + } catch (SQLException e) { + throw new IOException("Failed to update delegation key in SQL secret manager", e); + } + } + + /** + * Removes the existing DelegationKey from the SQL database to + * invalidate it. + * @param key DelegationKey to remove from the SQL database. + */ + @Override + protected void removeStoredMasterKey(DelegationKey key) { + try { + deleteDelegationKey(key.getKeyId()); + } catch (SQLException e) { + LOG.warn("Failed to remove delegation key in SQL secret manager", e); + } + } + + /** + * Obtains the DelegationKey from the SQL database. + * @param keyId KeyId of the DelegationKey to obtain. + * @return DelegationKey that matches the given keyId or null + * if it doesn't exist in the database. + */ + @Override + protected DelegationKey getDelegationKey(int keyId) { + // Look for delegation key in local cache + DelegationKey delegationKey = super.getDelegationKey(keyId); + + if (delegationKey == null) { + try { + // Look for delegation key in SQL database + byte[] delegationKeyBytes = selectDelegationKey(keyId); + + if (delegationKeyBytes != null) { + delegationKey = new DelegationKey(); + try (ByteArrayInputStream bis = new ByteArrayInputStream(delegationKeyBytes)) { + try (DataInputStream dis = new DataInputStream(bis)) { + delegationKey.readFields(dis); + } + } + + // Update delegation key in local cache + allKeys.put(keyId, delegationKey); + } + } catch (IOException | SQLException e) { + LOG.error("Failed to get delegation key in SQL secret manager", e); + } + } + + return delegationKey; + } + + /** + * Obtains the value of the last delegation key id. + * @return Last delegation key id. + */ + @Override + public int getCurrentKeyId() { + try { + return selectKeyId(); + } catch (SQLException e) { + throw new RuntimeException( + "Failed to get delegation key id in SQL secret manager", e); + } + } + + /** + * Updates the value of the last delegation key id. + * @param keyId Value to update the delegation key id to. + */ + @Override + public void setCurrentKeyId(int keyId) { + try { + updateKeyId(keyId); + } catch (SQLException e) { + throw new RuntimeException( + "Failed to set delegation key id in SQL secret manager", e); + } + } + + /** + * Obtains the next available delegation key id that can be allocated to a DelegationKey. + * Delegation key id need to be reserved using the shared delegationKeyIdCounter, + * which handles keyId allocation concurrently with other secret managers. + * @return Next available delegation key id. + */ + @Override + public int incrementCurrentKeyId() { + try { + return incrementKeyId(1) + 1; + } catch (SQLException e) { + throw new RuntimeException( + "Failed to increment delegation key id in SQL secret manager", e); + } + } + + // Token operations in SQL database + protected abstract byte[] selectTokenInfo(int sequenceNum, byte[] tokenIdentifier) + throws SQLException; + protected abstract void insertToken(int sequenceNum, byte[] tokenIdentifier, byte[] tokenInfo) + throws SQLException; + protected abstract void updateToken(int sequenceNum, byte[] tokenIdentifier, byte[] tokenInfo) + throws SQLException; + protected abstract void deleteToken(int sequenceNum, byte[] tokenIdentifier) + throws SQLException; + // Delegation key operations in SQL database + protected abstract byte[] selectDelegationKey(int keyId) throws SQLException; + protected abstract void insertDelegationKey(int keyId, byte[] delegationKey) + throws SQLException; + protected abstract void updateDelegationKey(int keyId, byte[] delegationKey) + throws SQLException; + protected abstract void deleteDelegationKey(int keyId) throws SQLException; + // Counter operations in SQL database + protected abstract int selectSequenceNum() throws SQLException; + protected abstract void updateSequenceNum(int value) throws SQLException; + protected abstract int incrementSequenceNum(int amount) throws SQLException; + protected abstract int selectKeyId() throws SQLException; + protected abstract void updateKeyId(int value) throws SQLException; + protected abstract int incrementKeyId(int amount) throws SQLException; +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/pom.xml b/hadoop-hdfs-project/hadoop-hdfs-rbf/pom.xml index 9fb868f79f..9e6f12ba8a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/pom.xml +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/pom.xml @@ -117,6 +117,15 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd"> com.fasterxml.jackson.core jackson-databind + + com.zaxxer + HikariCP + + + mysql + mysql-connector-java + provided + junit junit @@ -153,6 +162,11 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd"> curator-test test + + org.apache.derby + derby + test + org.mockito mockito-core @@ -170,6 +184,14 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd"> org.apache.maven.plugins maven-surefire-plugin + + + + derby.stream.error.file + ${project.build.directory}/derby.log + + + org.apache.maven.plugins diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/scripts/TokenStore/MySQL/TokenStoreDatabase.sql b/hadoop-hdfs-project/hadoop-hdfs-rbf/scripts/TokenStore/MySQL/TokenStoreDatabase.sql new file mode 100644 index 0000000000..07fea4c24b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/scripts/TokenStore/MySQL/TokenStoreDatabase.sql @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Script to create a new Database in MySQL for the TokenStore + +CREATE DATABASE IF NOT EXISTS TokenStore; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/scripts/TokenStore/MySQL/TokenStoreTables.sql b/hadoop-hdfs-project/hadoop-hdfs-rbf/scripts/TokenStore/MySQL/TokenStoreTables.sql new file mode 100644 index 0000000000..d377c4e15f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/scripts/TokenStore/MySQL/TokenStoreTables.sql @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Script to generate all the tables for the TokenStore in MySQL + +USE TokenStore + +CREATE TABLE IF NOT EXISTS Tokens( + sequenceNum int NOT NULL, + tokenIdentifier varbinary(255) NOT NULL, + tokenInfo varbinary(255) NOT NULL, + modifiedTime timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + PRIMARY KEY(sequenceNum, tokenIdentifier) +); + +CREATE TABLE IF NOT EXISTS DelegationKeys( + keyId int NOT NULL, + delegationKey varbinary(255) NOT NULL, + modifiedTime timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + PRIMARY KEY(keyId) +); + +CREATE TABLE IF NOT EXISTS LastSequenceNum( + sequenceNum int NOT NULL +); + +-- Initialize the LastSequenceNum table with a single entry +INSERT INTO LastSequenceNum (sequenceNum) +SELECT 0 WHERE NOT EXISTS (SELECT * FROM LastSequenceNum); + +CREATE TABLE IF NOT EXISTS LastDelegationKeyId( + keyId int NOT NULL +); + +-- Initialize the LastDelegationKeyId table with a single entry +INSERT INTO LastDelegationKeyId (keyId) +SELECT 0 WHERE NOT EXISTS (SELECT * FROM LastDelegationKeyId); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/scripts/TokenStore/MySQL/TokenStoreUser.sql b/hadoop-hdfs-project/hadoop-hdfs-rbf/scripts/TokenStore/MySQL/TokenStoreUser.sql new file mode 100644 index 0000000000..844d7a2f94 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/scripts/TokenStore/MySQL/TokenStoreUser.sql @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Script to create a new User in MySQL for the TokenStore + +-- Update TokenStore user and password on this script +CREATE USER IF NOT EXISTS 'TokenStoreUser'@'%' IDENTIFIED BY 'TokenStorePassword'; + +GRANT ALL PRIVILEGES ON TokenStore.* TO 'TokenStoreUser'@'%'; + +FLUSH PRIVILEGES; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/scripts/TokenStore/README b/hadoop-hdfs-project/hadoop-hdfs-rbf/scripts/TokenStore/README new file mode 100644 index 0000000000..7242531531 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/scripts/TokenStore/README @@ -0,0 +1,24 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +These scripts must be executed to create the TokenStore database, tables and users needed to use the +SQLDelegationTokenSecretManagerImpl as the delegation token secret manager: +1. TokenStoreDatabase.sql +2. TokenStoreTables.sql +3. TokenStoreUser.sql + +Note: The TokenStoreUser.sql defines a default user/password. You are highly encouraged to set +this to a proper strong password. diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/DistributedSQLCounter.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/DistributedSQLCounter.java new file mode 100644 index 0000000000..14b232783f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/DistributedSQLCounter.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.federation.router.security.token; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Distributed counter that relies on a SQL database to synchronize + * between multiple clients. This expects a table with a single int field + * to exist in the database. One record must exist on the table at all times, + * representing the last used value reserved by a client. + */ +public class DistributedSQLCounter { + private static final Logger LOG = + LoggerFactory.getLogger(DistributedSQLCounter.class); + + private final String field; + private final String table; + private final SQLConnectionFactory connectionFactory; + + public DistributedSQLCounter(String field, String table, + SQLConnectionFactory connectionFactory) { + this.field = field; + this.table = table; + this.connectionFactory = connectionFactory; + } + + /** + * Obtains the value of the counter. + * @return counter value. + */ + public int selectCounterValue() throws SQLException { + try (Connection connection = connectionFactory.getConnection()) { + return selectCounterValue(false, connection); + } + } + + private int selectCounterValue(boolean forUpdate, Connection connection) throws SQLException { + String query = String.format("SELECT %s FROM %s %s", field, table, + forUpdate ? "FOR UPDATE" : ""); + LOG.debug("Select counter statement: " + query); + try (Statement statement = connection.createStatement(); + ResultSet result = statement.executeQuery(query)) { + if (result.next()) { + return result.getInt(field); + } else { + throw new IllegalStateException("Counter table not initialized: " + table); + } + } + } + + /** + * Sets the counter to the given value. + * @param value Value to assign to counter. + */ + public void updateCounterValue(int value) throws SQLException { + try (Connection connection = connectionFactory.getConnection(true)) { + updateCounterValue(value, connection); + } + } + + /** + * Sets the counter to the given value. + * @param connection Connection to database hosting the counter table. + * @param value Value to assign to counter. + */ + public void updateCounterValue(int value, Connection connection) throws SQLException { + String queryText = String.format("UPDATE %s SET %s = ?", table, field); + LOG.debug("Update counter statement: " + queryText + ". Value: " + value); + try (PreparedStatement statement = connection.prepareStatement(queryText)) { + statement.setInt(1, value); + statement.execute(); + } + } + + /** + * Increments the counter by the given amount and + * returns the previous counter value. + * @param amount Amount to increase the counter. + * @return Previous counter value. + */ + public int incrementCounterValue(int amount) throws SQLException { + // Disabling auto-commit to ensure that all statements on this transaction + // are committed at once. + try (Connection connection = connectionFactory.getConnection(false)) { + // Preventing dirty reads and non-repeatable reads to ensure that the + // value read will not be updated by a different connection. + if (connection.getTransactionIsolation() < Connection.TRANSACTION_REPEATABLE_READ) { + connection.setTransactionIsolation(Connection.TRANSACTION_REPEATABLE_READ); + } + + try { + // Reading the counter value "FOR UPDATE" to lock the value record, + // forcing other connections to wait until this transaction is committed. + int lastValue = selectCounterValue(true, connection); + + // Calculate the new counter value and handling overflow by + // resetting the counter to 0. + int newValue = lastValue + amount; + if (newValue < 0) { + lastValue = 0; + newValue = amount; + } + + updateCounterValue(newValue, connection); + connection.commit(); + return lastValue; + } catch (Exception e) { + // Rollback transaction to release table locks + connection.rollback(); + throw e; + } + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/HikariDataSourceConnectionFactory.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/HikariDataSourceConnectionFactory.java new file mode 100644 index 0000000000..5510e9f54b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/HikariDataSourceConnectionFactory.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.federation.router.security.token; + +import com.zaxxer.hikari.HikariConfig; +import com.zaxxer.hikari.HikariDataSource; +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.token.delegation.SQLDelegationTokenSecretManager; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Properties; + +/** + * Class that relies on a HikariDataSource to provide SQL connections. + */ +class HikariDataSourceConnectionFactory implements SQLConnectionFactory { + protected final static String HIKARI_PROPS = SQLDelegationTokenSecretManager.SQL_DTSM_CONF_PREFIX + + "connection.hikari."; + private final HikariDataSource dataSource; + + HikariDataSourceConnectionFactory(Configuration conf) { + Properties properties = new Properties(); + properties.setProperty("jdbcUrl", conf.get(CONNECTION_URL)); + properties.setProperty("username", conf.get(CONNECTION_USERNAME)); + properties.setProperty("password", conf.get(CONNECTION_PASSWORD)); + properties.setProperty("driverClassName", conf.get(CONNECTION_DRIVER)); + + // Include hikari connection properties + properties.putAll(conf.getPropsWithPrefix(HIKARI_PROPS)); + + HikariConfig hikariConfig = new HikariConfig(properties); + this.dataSource = new HikariDataSource(hikariConfig); + } + + @Override + public Connection getConnection() throws SQLException { + return dataSource.getConnection(); + } + + @Override + public void shutdown() { + // Close database connections + dataSource.close(); + } + + @VisibleForTesting + HikariDataSource getDataSource() { + return dataSource; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/SQLConnectionFactory.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/SQLConnectionFactory.java new file mode 100644 index 0000000000..a464cc8196 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/SQLConnectionFactory.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.federation.router.security.token; + +import com.mysql.cj.jdbc.MysqlDataSource; +import com.zaxxer.hikari.HikariConfig; +import com.zaxxer.hikari.HikariDataSource; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Properties; +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.token.delegation.SQLDelegationTokenSecretManager; + + +/** + * Interface to provide SQL connections to the {@link SQLDelegationTokenSecretManagerImpl}. + */ +public interface SQLConnectionFactory { + String CONNECTION_URL = SQLDelegationTokenSecretManager.SQL_DTSM_CONF_PREFIX + + "connection.url"; + String CONNECTION_USERNAME = SQLDelegationTokenSecretManager.SQL_DTSM_CONF_PREFIX + + "connection.username"; + String CONNECTION_PASSWORD = SQLDelegationTokenSecretManager.SQL_DTSM_CONF_PREFIX + + "connection.password"; + String CONNECTION_DRIVER = SQLDelegationTokenSecretManager.SQL_DTSM_CONF_PREFIX + + "connection.driver"; + + Connection getConnection() throws SQLException; + void shutdown(); + + default Connection getConnection(boolean autocommit) throws SQLException { + Connection connection = getConnection(); + connection.setAutoCommit(autocommit); + return connection; + } +} + diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/SQLDelegationTokenSecretManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/SQLDelegationTokenSecretManagerImpl.java new file mode 100644 index 0000000000..7da54778f3 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/SQLDelegationTokenSecretManagerImpl.java @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.federation.router.security.token; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier; +import org.apache.hadoop.security.token.delegation.SQLDelegationTokenSecretManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * An implementation of {@link SQLDelegationTokenSecretManager} that + * persists TokenIdentifiers and DelegationKeys in a SQL database. + * This implementation relies on the Datanucleus JDO PersistenceManager, which + * can be configured with datanucleus.* configuration properties. + */ +public class SQLDelegationTokenSecretManagerImpl + extends SQLDelegationTokenSecretManager { + + private static final Logger LOG = + LoggerFactory.getLogger(SQLDelegationTokenSecretManagerImpl.class); + private static final String SEQ_NUM_COUNTER_FIELD = "sequenceNum"; + private static final String SEQ_NUM_COUNTER_TABLE = "LastSequenceNum"; + private static final String KEY_ID_COUNTER_FIELD = "keyId"; + private static final String KEY_ID_COUNTER_TABLE = "LastDelegationKeyId"; + + private final SQLConnectionFactory connectionFactory; + private final DistributedSQLCounter sequenceNumCounter; + private final DistributedSQLCounter delegationKeyIdCounter; + private final SQLSecretManagerRetriableHandler retryHandler; + + public SQLDelegationTokenSecretManagerImpl(Configuration conf) { + this(conf, new HikariDataSourceConnectionFactory(conf), + SQLSecretManagerRetriableHandlerImpl.getInstance(conf)); + } + + public SQLDelegationTokenSecretManagerImpl(Configuration conf, + SQLConnectionFactory connectionFactory, SQLSecretManagerRetriableHandler retryHandler) { + super(conf); + + this.connectionFactory = connectionFactory; + this.sequenceNumCounter = new DistributedSQLCounter(SEQ_NUM_COUNTER_FIELD, + SEQ_NUM_COUNTER_TABLE, connectionFactory); + this.delegationKeyIdCounter = new DistributedSQLCounter(KEY_ID_COUNTER_FIELD, + KEY_ID_COUNTER_TABLE, connectionFactory); + this.retryHandler = retryHandler; + + try { + super.startThreads(); + } catch (IOException e) { + throw new RuntimeException("Error starting threads for MySQL secret manager", e); + } + + LOG.info("MySQL delegation token secret manager instantiated"); + } + + @Override + public DelegationTokenIdentifier createIdentifier() { + return new DelegationTokenIdentifier(); + } + + @Override + public void stopThreads() { + super.stopThreads(); + connectionFactory.shutdown(); + } + + @Override + protected void insertToken(int sequenceNum, byte[] tokenIdentifier, byte[] tokenInfo) + throws SQLException { + retryHandler.execute(() -> { + try (Connection connection = connectionFactory.getConnection(true); + PreparedStatement statement = connection.prepareStatement( + "INSERT INTO Tokens (sequenceNum, tokenIdentifier, tokenInfo) VALUES (?, ?, ?)")) { + statement.setInt(1, sequenceNum); + statement.setBytes(2, tokenIdentifier); + statement.setBytes(3, tokenInfo); + statement.execute(); + } + }); + } + + @Override + protected void updateToken(int sequenceNum, byte[] tokenIdentifier, byte[] tokenInfo) + throws SQLException { + retryHandler.execute(() -> { + try (Connection connection = connectionFactory.getConnection(true); + PreparedStatement statement = connection.prepareStatement( + "UPDATE Tokens SET tokenInfo = ? WHERE sequenceNum = ? AND tokenIdentifier = ?")) { + statement.setBytes(1, tokenInfo); + statement.setInt(2, sequenceNum); + statement.setBytes(3, tokenIdentifier); + statement.execute(); + } + }); + } + + @Override + protected void deleteToken(int sequenceNum, byte[] tokenIdentifier) throws SQLException { + retryHandler.execute(() -> { + try (Connection connection = connectionFactory.getConnection(true); + PreparedStatement statement = connection.prepareStatement( + "DELETE FROM Tokens WHERE sequenceNum = ? AND tokenIdentifier = ?")) { + statement.setInt(1, sequenceNum); + statement.setBytes(2, tokenIdentifier); + statement.execute(); + } + }); + } + + @Override + protected byte[] selectTokenInfo(int sequenceNum, byte[] tokenIdentifier) throws SQLException { + return retryHandler.execute(() -> { + try (Connection connection = connectionFactory.getConnection(); + PreparedStatement statement = connection.prepareStatement( + "SELECT tokenInfo FROM Tokens WHERE sequenceNum = ? AND tokenIdentifier = ?")) { + statement.setInt(1, sequenceNum); + statement.setBytes(2, tokenIdentifier); + try (ResultSet result = statement.executeQuery()) { + if (result.next()) { + return result.getBytes("tokenInfo"); + } + } + } + return null; + }); + } + + @Override + protected void insertDelegationKey(int keyId, byte[] delegationKey) throws SQLException { + retryHandler.execute(() -> { + try (Connection connection = connectionFactory.getConnection(true); + PreparedStatement statement = connection.prepareStatement( + "INSERT INTO DelegationKeys (keyId, delegationKey) VALUES (?, ?)")) { + statement.setInt(1, keyId); + statement.setBytes(2, delegationKey); + statement.execute(); + } + }); + } + + @Override + protected void updateDelegationKey(int keyId, byte[] delegationKey) throws SQLException { + retryHandler.execute(() -> { + try (Connection connection = connectionFactory.getConnection(true); + PreparedStatement statement = connection.prepareStatement( + "UPDATE DelegationKeys SET delegationKey = ? WHERE keyId = ?")) { + statement.setBytes(1, delegationKey); + statement.setInt(2, keyId); + statement.execute(); + } + }); + } + + @Override + protected void deleteDelegationKey(int keyId) throws SQLException { + retryHandler.execute(() -> { + try (Connection connection = connectionFactory.getConnection(true); + PreparedStatement statement = connection.prepareStatement( + "DELETE FROM DelegationKeys WHERE keyId = ?")) { + statement.setInt(1, keyId); + statement.execute(); + } + }); + } + + @Override + protected byte[] selectDelegationKey(int keyId) throws SQLException { + return retryHandler.execute(() -> { + try (Connection connection = connectionFactory.getConnection(); + PreparedStatement statement = connection.prepareStatement( + "SELECT delegationKey FROM DelegationKeys WHERE keyId = ?")) { + statement.setInt(1, keyId); + try (ResultSet result = statement.executeQuery()) { + if (result.next()) { + return result.getBytes("delegationKey"); + } + } + } + return null; + }); + } + + @Override + protected int selectSequenceNum() throws SQLException { + return retryHandler.execute(() -> sequenceNumCounter.selectCounterValue()); + } + + @Override + protected void updateSequenceNum(int value) throws SQLException { + retryHandler.execute(() -> sequenceNumCounter.updateCounterValue(value)); + } + + @Override + protected int incrementSequenceNum(int amount) throws SQLException { + return retryHandler.execute(() -> sequenceNumCounter.incrementCounterValue(amount)); + } + + @Override + protected int selectKeyId() throws SQLException { + return retryHandler.execute(delegationKeyIdCounter::selectCounterValue); + } + + @Override + protected void updateKeyId(int value) throws SQLException { + retryHandler.execute(() -> delegationKeyIdCounter.updateCounterValue(value)); + } + + @Override + protected int incrementKeyId(int amount) throws SQLException { + return retryHandler.execute(() -> delegationKeyIdCounter.incrementCounterValue(amount)); + } + + @VisibleForTesting + protected SQLConnectionFactory getConnectionFactory() { + return connectionFactory; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/SQLSecretManagerRetriableHandler.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/SQLSecretManagerRetriableHandler.java new file mode 100644 index 0000000000..1615122621 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/SQLSecretManagerRetriableHandler.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.federation.router.security.token; + +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.io.retry.RetryProxy; +import org.apache.hadoop.security.token.delegation.SQLDelegationTokenSecretManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Interface to handle retries when {@link SQLDelegationTokenSecretManagerImpl} + * throws expected errors. + */ +public interface SQLSecretManagerRetriableHandler { + void execute(SQLCommandVoid command) throws SQLException; + T execute(SQLCommand command) throws SQLException; + + @FunctionalInterface + interface SQLCommandVoid { + void doCall() throws SQLException; + } + + @FunctionalInterface + interface SQLCommand { + T doCall() throws SQLException; + } +} + +/** + * Implementation of {@link SQLSecretManagerRetriableHandler} that uses a + * {@link RetryProxy} to simplify the retryable operations. + */ +class SQLSecretManagerRetriableHandlerImpl implements SQLSecretManagerRetriableHandler { + public final static String MAX_RETRIES = + SQLDelegationTokenSecretManager.SQL_DTSM_CONF_PREFIX + "max-retries"; + public final static int MAX_RETRIES_DEFAULT = 0; + public final static String RETRY_SLEEP_TIME_MS = + SQLDelegationTokenSecretManager.SQL_DTSM_CONF_PREFIX + "retry-sleep-time-ms"; + public final static long RETRY_SLEEP_TIME_MS_DEFAULT = 100; + + private static final Logger LOG = + LoggerFactory.getLogger(SQLSecretManagerRetriableHandlerImpl.class); + + static SQLSecretManagerRetriableHandler getInstance(Configuration conf) { + return getInstance(conf, new SQLSecretManagerRetriableHandlerImpl()); + } + + static SQLSecretManagerRetriableHandler getInstance(Configuration conf, + SQLSecretManagerRetriableHandlerImpl retryHandler) { + RetryPolicy basePolicy = RetryPolicies.exponentialBackoffRetry( + conf.getInt(MAX_RETRIES, MAX_RETRIES_DEFAULT), + conf.getLong(RETRY_SLEEP_TIME_MS, RETRY_SLEEP_TIME_MS_DEFAULT), + TimeUnit.MILLISECONDS); + + // Configure SQLSecretManagerRetriableException to retry with exponential backoff + Map, RetryPolicy> exceptionToPolicyMap = new HashMap<>(); + exceptionToPolicyMap.put(SQLSecretManagerRetriableException.class, basePolicy); + + // Configure all other exceptions to fail after one attempt + RetryPolicy retryPolicy = RetryPolicies.retryByException( + RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap); + + return (SQLSecretManagerRetriableHandler) RetryProxy.create( + SQLSecretManagerRetriableHandler.class, retryHandler, retryPolicy); + } + + /** + * Executes a SQL command and raises retryable errors as + * {@link SQLSecretManagerRetriableException}s so they are recognized by the + * {@link RetryProxy}. + * @param command SQL command to execute + * @throws SQLException When SQL connection errors occur + */ + @Override + public void execute(SQLCommandVoid command) throws SQLException { + try { + command.doCall(); + } catch (SQLException e) { + LOG.warn("Failed to execute SQL command", e); + throw new SQLSecretManagerRetriableException(e); + } + } + + /** + * Executes a SQL command and raises retryable errors as + * {@link SQLSecretManagerRetriableException}s so they are recognized by the + * {@link RetryProxy}. + * @param command SQL command to execute + * @throws SQLException When SQL connection errors occur + */ + @Override + public T execute(SQLCommand command) throws SQLException { + try { + return command.doCall(); + } catch (SQLException e) { + LOG.warn("Failed to execute SQL command", e); + throw new SQLSecretManagerRetriableException(e); + } + } + + /** + * Class used to identify errors that can be retried. + */ + static class SQLSecretManagerRetriableException extends SQLException { + SQLSecretManagerRetriableException(Throwable cause) { + super(cause); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/security/token/TestSQLDelegationTokenSecretManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/security/token/TestSQLDelegationTokenSecretManagerImpl.java new file mode 100644 index 0000000000..569a274042 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/security/token/TestSQLDelegationTokenSecretManagerImpl.java @@ -0,0 +1,471 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.federation.router.security.token; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier; +import org.apache.hadoop.security.token.delegation.web.DelegationTokenManager; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + + +public class TestSQLDelegationTokenSecretManagerImpl { + private static final String CONNECTION_URL = "jdbc:derby:memory:TokenStore"; + private static final int TEST_MAX_RETRIES = 3; + private static Configuration conf; + + @Before + public void init() throws SQLException { + createTestDBTables(); + } + + @After + public void cleanup() throws SQLException { + dropTestDBTables(); + } + + @BeforeClass + public static void initDatabase() throws SQLException { + DriverManager.getConnection(CONNECTION_URL + ";create=true"); + + conf = new Configuration(); + conf.set(SQLConnectionFactory.CONNECTION_URL, CONNECTION_URL); + conf.set(SQLConnectionFactory.CONNECTION_USERNAME, "testuser"); + conf.set(SQLConnectionFactory.CONNECTION_PASSWORD, "testpassword"); + conf.set(SQLConnectionFactory.CONNECTION_DRIVER, "org.apache.derby.jdbc.EmbeddedDriver"); + conf.setInt(SQLSecretManagerRetriableHandlerImpl.MAX_RETRIES, TEST_MAX_RETRIES); + conf.setInt(SQLSecretManagerRetriableHandlerImpl.RETRY_SLEEP_TIME_MS, 10); + } + + @AfterClass + public static void cleanupDatabase() { + try { + DriverManager.getConnection(CONNECTION_URL + ";drop=true"); + } catch (SQLException e) { + // SQLException expected when database is dropped + if (!e.getMessage().contains("dropped")) { + throw new RuntimeException(e); + } + } + } + + @Test + public void testSingleSecretManager() throws Exception { + DelegationTokenManager tokenManager = createTokenManager(); + try { + Token token = + tokenManager.createToken(UserGroupInformation.getCurrentUser(), "foo"); + validateToken(tokenManager, token); + } finally { + stopTokenManager(tokenManager); + } + } + + @Test + public void testMultipleSecretManagers() throws Exception { + DelegationTokenManager tokenManager1 = createTokenManager(); + DelegationTokenManager tokenManager2 = createTokenManager(); + + try { + Token token1 = + tokenManager1.createToken(UserGroupInformation.getCurrentUser(), "foo"); + Token token2 = + tokenManager2.createToken(UserGroupInformation.getCurrentUser(), "foo"); + + validateToken(tokenManager1, token2); + validateToken(tokenManager2, token1); + } finally { + stopTokenManager(tokenManager1); + stopTokenManager(tokenManager2); + } + } + + @Test + public void testSequenceNumAllocation() throws Exception { + int tokensPerManager = SQLDelegationTokenSecretManagerImpl.DEFAULT_SEQ_NUM_BATCH_SIZE * 5; + Set sequenceNums1 = new HashSet<>(); + Set sequenceNums2 = new HashSet<>(); + Set sequenceNums3 = new HashSet<>(); + Set sequenceNums = new HashSet<>(); + DelegationTokenManager tokenManager1 = createTokenManager(); + DelegationTokenManager tokenManager2 = createTokenManager(); + DelegationTokenManager tokenManager3 = createTokenManager(); + + try { + for (int i = 0; i < tokensPerManager; i++) { + allocateSequenceNum(tokenManager1, sequenceNums1); + allocateSequenceNum(tokenManager2, sequenceNums2); + allocateSequenceNum(tokenManager3, sequenceNums3); + sequenceNums.addAll(sequenceNums1); + sequenceNums.addAll(sequenceNums2); + sequenceNums.addAll(sequenceNums3); + } + + Assert.assertEquals("Verify that all tokens were created with unique sequence numbers", + tokensPerManager * 3, sequenceNums.size()); + Assert.assertEquals("Verify that tokenManager1 generated unique sequence numbers", + tokensPerManager, sequenceNums1.size()); + Assert.assertEquals("Verify that tokenManager2 generated unique sequence number", + tokensPerManager, sequenceNums2.size()); + Assert.assertEquals("Verify that tokenManager3 generated unique sequence numbers", + tokensPerManager, sequenceNums3.size()); + + // Validate sequence number batches allocated in order to each token manager + int batchSize = SQLDelegationTokenSecretManagerImpl.DEFAULT_SEQ_NUM_BATCH_SIZE; + for (int seqNum = 1; seqNum < tokensPerManager;) { + // First batch allocated tokenManager1 + for (int i = 0; i < batchSize; i++, seqNum++) { + Assert.assertTrue(sequenceNums1.contains(seqNum)); + } + // Second batch allocated tokenManager2 + for (int i = 0; i < batchSize; i++, seqNum++) { + Assert.assertTrue(sequenceNums2.contains(seqNum)); + } + // Third batch allocated tokenManager3 + for (int i = 0; i < batchSize; i++, seqNum++) { + Assert.assertTrue(sequenceNums3.contains(seqNum)); + } + } + + SQLDelegationTokenSecretManagerImpl secretManager = + (SQLDelegationTokenSecretManagerImpl) tokenManager1.getDelegationTokenSecretManager(); + Assert.assertEquals("Verify that the counter is set to the highest sequence number", + tokensPerManager * 3, secretManager.getDelegationTokenSeqNum()); + } finally { + stopTokenManager(tokenManager1); + stopTokenManager(tokenManager2); + stopTokenManager(tokenManager3); + } + } + + @Test + public void testSequenceNumRollover() throws Exception { + int tokenBatch = SQLDelegationTokenSecretManagerImpl.DEFAULT_SEQ_NUM_BATCH_SIZE; + Set sequenceNums = new HashSet<>(); + + DelegationTokenManager tokenManager = createTokenManager(); + + try { + SQLDelegationTokenSecretManagerImpl secretManager = + (SQLDelegationTokenSecretManagerImpl) tokenManager.getDelegationTokenSecretManager(); + secretManager.setDelegationTokenSeqNum(Integer.MAX_VALUE - tokenBatch); + + // Allocate sequence numbers before they are rolled over + for (int seqNum = Integer.MAX_VALUE - tokenBatch; seqNum < Integer.MAX_VALUE; seqNum++) { + allocateSequenceNum(tokenManager, sequenceNums); + Assert.assertTrue(sequenceNums.contains(seqNum + 1)); + } + + // Allocate sequence numbers after they are rolled over + for (int seqNum = 0; seqNum < tokenBatch; seqNum++) { + allocateSequenceNum(tokenManager, sequenceNums); + Assert.assertTrue(sequenceNums.contains(seqNum + 1)); + } + } finally { + stopTokenManager(tokenManager); + } + } + + @Test + public void testDelegationKeyAllocation() throws Exception { + DelegationTokenManager tokenManager1 = createTokenManager(); + + try { + SQLDelegationTokenSecretManagerImpl secretManager1 = + (SQLDelegationTokenSecretManagerImpl) tokenManager1.getDelegationTokenSecretManager(); + // Prevent delegation keys to roll for the rest of the test to avoid race conditions + // between the keys generated and the active keys in the database. + ((TestDelegationTokenSecretManager) secretManager1).lockKeyRoll(); + int keyId1 = secretManager1.getCurrentKeyId(); + + // Validate that latest key1 is assigned to tokenManager1 tokens + Token token1 = + tokenManager1.createToken(UserGroupInformation.getCurrentUser(), "foo"); + validateKeyId(token1, keyId1); + + DelegationTokenManager tokenManager2 = createTokenManager(); + try { + SQLDelegationTokenSecretManagerImpl secretManager2 = + (SQLDelegationTokenSecretManagerImpl) tokenManager2.getDelegationTokenSecretManager(); + // Prevent delegation keys to roll for the rest of the test + ((TestDelegationTokenSecretManager) secretManager2).lockKeyRoll(); + int keyId2 = secretManager2.getCurrentKeyId(); + + Assert.assertNotEquals("Each secret manager has its own key", keyId1, keyId2); + + // Validate that latest key2 is assigned to tokenManager2 tokens + Token token2 = + tokenManager2.createToken(UserGroupInformation.getCurrentUser(), "foo"); + validateKeyId(token2, keyId2); + + // Validate that key1 is still assigned to tokenManager1 tokens + token1 = tokenManager1.createToken(UserGroupInformation.getCurrentUser(), "foo"); + validateKeyId(token1, keyId1); + + // Validate that key2 is still assigned to tokenManager2 tokens + token2 = tokenManager2.createToken(UserGroupInformation.getCurrentUser(), "foo"); + validateKeyId(token2, keyId2); + } finally { + stopTokenManager(tokenManager2); + } + } finally { + stopTokenManager(tokenManager1); + } + } + + @Test + public void testHikariConfigs() { + HikariDataSourceConnectionFactory factory1 = new HikariDataSourceConnectionFactory(conf); + int defaultMaximumPoolSize = factory1.getDataSource().getMaximumPoolSize(); + factory1.shutdown(); + + // Changing default maximumPoolSize + Configuration hikariConf = new Configuration(conf); + hikariConf.setInt(HikariDataSourceConnectionFactory.HIKARI_PROPS + "maximumPoolSize", + defaultMaximumPoolSize + 1); + + // Verifying property is correctly set in datasource + HikariDataSourceConnectionFactory factory2 = new HikariDataSourceConnectionFactory(hikariConf); + Assert.assertEquals(factory2.getDataSource().getMaximumPoolSize(), + defaultMaximumPoolSize + 1); + factory2.shutdown(); + } + + @Test + public void testRetries() throws Exception { + DelegationTokenManager tokenManager = createTokenManager(); + TestDelegationTokenSecretManager secretManager = + (TestDelegationTokenSecretManager) tokenManager.getDelegationTokenSecretManager(); + + try { + // Prevent delegation keys to roll for the rest of the test + secretManager.lockKeyRoll(); + + // Reset counter and expect a single request when inserting a token + TestRetryHandler.resetExecutionAttemptCounter(); + tokenManager.createToken(UserGroupInformation.getCurrentUser(), "foo"); + Assert.assertEquals(1, TestRetryHandler.getExecutionAttempts()); + + // Breaking database connections to cause retries + secretManager.setReadOnly(true); + + // Reset counter and expect a multiple retries when failing to insert a token + TestRetryHandler.resetExecutionAttemptCounter(); + tokenManager.createToken(UserGroupInformation.getCurrentUser(), "foo"); + Assert.assertEquals(TEST_MAX_RETRIES + 1, TestRetryHandler.getExecutionAttempts()); + } finally { + // Fix database connections + secretManager.setReadOnly(false); + stopTokenManager(tokenManager); + } + } + + private DelegationTokenManager createTokenManager() { + DelegationTokenManager tokenManager = new DelegationTokenManager(new Configuration(), null); + tokenManager.setExternalDelegationTokenSecretManager(new TestDelegationTokenSecretManager()); + return tokenManager; + } + + private void allocateSequenceNum(DelegationTokenManager tokenManager, Set sequenceNums) + throws IOException { + Token token = + tokenManager.createToken(UserGroupInformation.getCurrentUser(), "foo"); + AbstractDelegationTokenIdentifier tokenIdentifier = token.decodeIdentifier(); + Assert.assertFalse("Verify sequence number is unique", + sequenceNums.contains(tokenIdentifier.getSequenceNumber())); + + sequenceNums.add(tokenIdentifier.getSequenceNumber()); + } + + private void validateToken(DelegationTokenManager tokenManager, + Token token) + throws Exception { + SQLDelegationTokenSecretManagerImpl secretManager = + (SQLDelegationTokenSecretManagerImpl) tokenManager.getDelegationTokenSecretManager(); + AbstractDelegationTokenIdentifier tokenIdentifier = token.decodeIdentifier(); + + // Verify token using token manager + tokenManager.verifyToken(token); + + byte[] tokenInfo1 = secretManager.selectTokenInfo(tokenIdentifier.getSequenceNumber(), + tokenIdentifier.getBytes()); + Assert.assertNotNull("Verify token exists in database", tokenInfo1); + + // Renew token using token manager + tokenManager.renewToken(token, "foo"); + + byte[] tokenInfo2 = secretManager.selectTokenInfo(tokenIdentifier.getSequenceNumber(), + tokenIdentifier.getBytes()); + Assert.assertNotNull("Verify token exists in database", tokenInfo2); + Assert.assertFalse("Verify token has been updated in database", + Arrays.equals(tokenInfo1, tokenInfo2)); + + // Cancel token using token manager + tokenManager.cancelToken(token, "foo"); + byte[] tokenInfo3 = secretManager.selectTokenInfo(tokenIdentifier.getSequenceNumber(), + tokenIdentifier.getBytes()); + Assert.assertNull("Verify token was removed from database", tokenInfo3); + } + + private void validateKeyId(Token token, + int expectedKeyiD) throws IOException { + AbstractDelegationTokenIdentifier tokenIdentifier = token.decodeIdentifier(); + Assert.assertEquals("Verify that keyId is assigned to token", + tokenIdentifier.getMasterKeyId(), expectedKeyiD); + } + + private static Connection getTestDBConnection() { + try { + return DriverManager.getConnection(CONNECTION_URL); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private static void createTestDBTables() throws SQLException { + execute("CREATE TABLE Tokens (sequenceNum INT NOT NULL, " + + "tokenIdentifier VARCHAR (255) FOR BIT DATA NOT NULL, " + + "tokenInfo VARCHAR (255) FOR BIT DATA NOT NULL, " + + "modifiedTime TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, " + + "PRIMARY KEY(sequenceNum))"); + execute("CREATE TABLE DelegationKeys (keyId INT NOT NULL, " + + "delegationKey VARCHAR (255) FOR BIT DATA NOT NULL, " + + "modifiedTime TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, " + + "PRIMARY KEY(keyId))"); + execute("CREATE TABLE LastSequenceNum (sequenceNum INT NOT NULL)"); + execute("INSERT INTO LastSequenceNum VALUES (0)"); + execute("CREATE TABLE LastDelegationKeyId (keyId INT NOT NULL)"); + execute("INSERT INTO LastDelegationKeyId VALUES (0)"); + } + + private static void dropTestDBTables() throws SQLException { + execute("DROP TABLE Tokens"); + execute("DROP TABLE DelegationKeys"); + execute("DROP TABLE LastSequenceNum"); + execute("DROP TABLE LastDelegationKeyId"); + } + + private static void execute(String statement) throws SQLException { + try (Connection connection = getTestDBConnection()) { + connection.createStatement().execute(statement); + } + } + + private void stopTokenManager(DelegationTokenManager tokenManager) { + TestDelegationTokenSecretManager secretManager = + (TestDelegationTokenSecretManager) tokenManager.getDelegationTokenSecretManager(); + // Release any locks on tables + secretManager.unlockKeyRoll(); + // Stop threads to close database connections + secretManager.stopThreads(); + } + + static class TestDelegationTokenSecretManager extends SQLDelegationTokenSecretManagerImpl { + private ReentrantLock keyRollLock; + + private synchronized ReentrantLock getKeyRollLock() { + if (keyRollLock == null) { + keyRollLock = new ReentrantLock(); + } + return keyRollLock; + } + + TestDelegationTokenSecretManager() { + super(conf, new TestConnectionFactory(conf), + SQLSecretManagerRetriableHandlerImpl.getInstance(conf, new TestRetryHandler())); + } + + // Tests can call this method to prevent delegation keys from + // being rolled in the middle of a test to prevent race conditions + public void lockKeyRoll() { + getKeyRollLock().lock(); + } + + public void unlockKeyRoll() { + if (getKeyRollLock().isHeldByCurrentThread()) { + getKeyRollLock().unlock(); + } + } + + @Override + protected void rollMasterKey() throws IOException { + try { + lockKeyRoll(); + super.rollMasterKey(); + } finally { + unlockKeyRoll(); + } + } + + public void setReadOnly(boolean readOnly) { + ((TestConnectionFactory) getConnectionFactory()).readOnly = readOnly; + } + } + + static class TestConnectionFactory extends HikariDataSourceConnectionFactory { + private boolean readOnly = false; + TestConnectionFactory(Configuration conf) { + super(conf); + } + + @Override + public Connection getConnection() throws SQLException { + Connection connection = super.getConnection(); + // Change to default schema as derby driver looks for user schema + connection.setSchema("APP"); + connection.setReadOnly(readOnly); + return connection; + } + } + + static class TestRetryHandler extends SQLSecretManagerRetriableHandlerImpl { + // Tracks the amount of times that a SQL command is executed, regardless of + // whether it completed successfully or not. + private static AtomicInteger executionAttemptCounter = new AtomicInteger(); + + static void resetExecutionAttemptCounter() { + executionAttemptCounter = new AtomicInteger(); + } + + static int getExecutionAttempts() { + return executionAttemptCounter.get(); + } + + @Override + public void execute(SQLCommandVoid command) throws SQLException { + executionAttemptCounter.incrementAndGet(); + super.execute(command); + } + } +} diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index 9e12edaf55..550c716d48 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -129,6 +129,8 @@ 1.0-alpha-1 3.3.1 4.0.3 + 10.10.2.0 + 8.0.29 6.2.1.jre7 4.10.0 3.2.0 @@ -1809,6 +1811,16 @@ HikariCP ${hikari.version} + + org.apache.derby + derby + ${derby.version} + + + mysql + mysql-connector-java + ${mysql-connector-java.version} + com.microsoft.sqlserver mssql-jdbc