From 5272af8c7eab76d779c621eb0208bf29ffa25613 Mon Sep 17 00:00:00 2001 From: Carlo Curino Date: Tue, 25 Apr 2017 15:14:02 -0700 Subject: [PATCH] YARN-3663. Federation State and Policy Store (DBMS implementation). (Giovanni Matteo Fumarola via curino). (cherry picked from commit be99c1fe2eb150fabd69902118d65941f82971f6) --- LICENSE.txt | 1 + hadoop-project/pom.xml | 12 + .../hadoop/yarn/conf/YarnConfiguration.java | 23 + .../conf/TestYarnConfigurationFields.java | 14 + .../hadoop-yarn-server-common/pom.xml | 20 + .../FederationStateStoreErrorCode.java | 105 -- .../FederationStateStoreException.java | 17 +- .../impl/MemoryFederationStateStore.java | 81 +- .../store/impl/SQLFederationStateStore.java | 937 ++++++++++++++++++ .../store/records/SubClusterState.java | 21 + ...tionHomeSubClusterStoreInputValidator.java | 12 +- ...ionMembershipStateStoreInputValidator.java | 14 +- .../FederationPolicyStoreInputValidator.java | 6 +- .../utils/FederationStateStoreUtils.java | 109 +- .../impl/FederationStateStoreBaseTest.java | 74 +- .../impl/HSQLDBFederationStateStore.java | 252 +++++ .../impl/TestMemoryFederationStateStore.java | 3 +- .../impl/TestSQLFederationStateStore.java | 49 + ...estFederationStateStoreInputValidator.java | 146 +-- .../TestFederationStateStoreFacadeRetry.java | 7 +- .../FederationStateStoreStoreProcs.sql | 511 ++++++++++ .../SQLServer/FederationStateStoreTables.sql | 122 +++ 22 files changed, 2228 insertions(+), 308 deletions(-) delete mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/exception/FederationStateStoreErrorCode.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/HSQLDBFederationStateStore.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestSQLFederationStateStore.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/resources/FederationStateStore/SQLServer/FederationStateStoreStoreProcs.sql create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/resources/FederationStateStore/SQLServer/FederationStateStoreTables.sql diff --git a/LICENSE.txt b/LICENSE.txt index 5391fd5620..af3175aedd 100644 --- a/LICENSE.txt +++ b/LICENSE.txt @@ -698,6 +698,7 @@ hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/static/jquery-1.10.2.min.js hadoop-tools/hadoop-sls/src/main/html/js/thirdparty/jquery.js hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/webapps/static/jquery Apache HBase - Server which contains JQuery minified javascript library version 1.8.3 +Microsoft SQLServer - JDBC version 6.1.0.jre7 -------------------------------------------------------------------------------- Copyright 2005, 2012, 2013 jQuery Foundation and other contributors, https://jquery.org/ diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index 5395a10e0b..15bd1fa1d5 100755 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -100,6 +100,8 @@ 1.0.0 3.0.3 + 2.4.11 + 6.1.0.jre7 1.8 @@ -1284,6 +1286,16 @@ ehcache ${ehcache.version} + + com.zaxxer + HikariCP-java7 + ${hikari.version} + + + com.microsoft.sqlserver + mssql-jdbc + ${mssql.version} + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 987f8cf2c7..7bcb12357d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -2614,6 +2614,29 @@ public static boolean isAclEnabled(Configuration conf) { public static final String DEFAULT_FEDERATION_POLICY_MANAGER_PARAMS = ""; + private static final String FEDERATION_STATESTORE_SQL_PREFIX = + FEDERATION_PREFIX + "state-store.sql."; + + public static final String FEDERATION_STATESTORE_SQL_USERNAME = + FEDERATION_STATESTORE_SQL_PREFIX + "username"; + + public static final String FEDERATION_STATESTORE_SQL_PASSWORD = + FEDERATION_STATESTORE_SQL_PREFIX + "password"; + + public static final String FEDERATION_STATESTORE_SQL_URL = + FEDERATION_STATESTORE_SQL_PREFIX + "url"; + + public static final String FEDERATION_STATESTORE_SQL_JDBC_CLASS = + FEDERATION_STATESTORE_SQL_PREFIX + "jdbc-class"; + + public static final String DEFAULT_FEDERATION_STATESTORE_SQL_JDBC_CLASS = + "org.hsqldb.jdbc.JDBCDataSource"; + + public static final String FEDERATION_STATESTORE_SQL_MAXCONNECTIONS = + FEDERATION_STATESTORE_SQL_PREFIX + "max-connections"; + + public static final int DEFAULT_FEDERATION_STATESTORE_SQL_MAXCONNECTIONS = 1; + //////////////////////////////// // Other Configs //////////////////////////////// diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java index 6e33c0aa40..c3cb78d806 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java @@ -90,6 +90,20 @@ public void initializeMemberVariables() { configurationPropsToSkipCompare .add(YarnConfiguration.DEFAULT_FEDERATION_POLICY_MANAGER_PARAMS); + // Federation StateStore SQL implementation configs to be ignored + configurationPropsToSkipCompare + .add(YarnConfiguration.FEDERATION_STATESTORE_SQL_JDBC_CLASS); + configurationPropsToSkipCompare + .add(YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_SQL_JDBC_CLASS); + configurationPropsToSkipCompare + .add(YarnConfiguration.FEDERATION_STATESTORE_SQL_USERNAME); + configurationPropsToSkipCompare + .add(YarnConfiguration.FEDERATION_STATESTORE_SQL_PASSWORD); + configurationPropsToSkipCompare + .add(YarnConfiguration.FEDERATION_STATESTORE_SQL_URL); + configurationPropsToSkipCompare + .add(YarnConfiguration.FEDERATION_STATESTORE_SQL_MAXCONNECTIONS); + // Ignore blacklisting nodes for AM failures feature since it is still a // "work in progress" configurationPropsToSkipCompare.add(YarnConfiguration. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml index 6cf41e7bce..c9f6d79683 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml @@ -110,6 +110,26 @@ org.ehcache ehcache + + com.zaxxer + HikariCP-java7 + + + org.hsqldb + hsqldb + test + + + com.microsoft.sqlserver + mssql-jdbc + runtime + + + com.microsoft.azure + azure-keyvault + + + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/exception/FederationStateStoreErrorCode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/exception/FederationStateStoreErrorCode.java deleted file mode 100644 index 88e2d3a044..0000000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/exception/FederationStateStoreErrorCode.java +++ /dev/null @@ -1,105 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.federation.store.exception; - -import org.apache.hadoop.classification.InterfaceAudience.Public; -import org.apache.hadoop.classification.InterfaceStability.Unstable; - -/** - *

- * Logical error codes from FederationStateStore. - *

- */ -@Public -@Unstable -public enum FederationStateStoreErrorCode { - - MEMBERSHIP_INSERT_FAIL(1101, "Fail to insert a tuple into Membership table."), - - MEMBERSHIP_DELETE_FAIL(1102, "Fail to delete a tuple from Membership table."), - - MEMBERSHIP_SINGLE_SELECT_FAIL(1103, - "Fail to select a tuple from Membership table."), - - MEMBERSHIP_MULTIPLE_SELECT_FAIL(1104, - "Fail to select multiple tuples from Membership table."), - - MEMBERSHIP_UPDATE_DEREGISTER_FAIL(1105, - "Fail to update/deregister a tuple in Membership table."), - - MEMBERSHIP_UPDATE_HEARTBEAT_FAIL(1106, - "Fail to update/heartbeat a tuple in Membership table."), - - APPLICATIONS_INSERT_FAIL(1201, - "Fail to insert a tuple into ApplicationsHomeSubCluster table."), - - APPLICATIONS_DELETE_FAIL(1202, - "Fail to delete a tuple from ApplicationsHomeSubCluster table"), - - APPLICATIONS_SINGLE_SELECT_FAIL(1203, - "Fail to select a tuple from ApplicationsHomeSubCluster table."), - - APPLICATIONS_MULTIPLE_SELECT_FAIL(1204, - "Fail to select multiple tuple from ApplicationsHomeSubCluster table."), - - APPLICATIONS_UPDATE_FAIL(1205, - "Fail to update a tuple in ApplicationsHomeSubCluster table."), - - POLICY_INSERT_FAIL(1301, "Fail to insert a tuple into Policy table."), - - POLICY_DELETE_FAIL(1302, "Fail to delete a tuple from Membership table."), - - POLICY_SINGLE_SELECT_FAIL(1303, "Fail to select a tuple from Policy table."), - - POLICY_MULTIPLE_SELECT_FAIL(1304, - "Fail to select multiple tuples from Policy table."), - - POLICY_UPDATE_FAIL(1305, "Fail to update a tuple in Policy table."); - - private final int id; - private final String msg; - - FederationStateStoreErrorCode(int id, String msg) { - this.id = id; - this.msg = msg; - } - - /** - * Get the error code related to the FederationStateStore failure. - * - * @return the error code related to the FederationStateStore failure. - */ - public int getId() { - return this.id; - } - - /** - * Get the error message related to the FederationStateStore failure. - * - * @return the error message related to the FederationStateStore failure. - */ - public String getMsg() { - return this.msg; - } - - @Override - public String toString() { - return "\nError Code: " + this.id + "\nError Message: " + this.msg; - } -} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/exception/FederationStateStoreException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/exception/FederationStateStoreException.java index 81a9e99826..1013ec6df2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/exception/FederationStateStoreException.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/exception/FederationStateStoreException.java @@ -31,15 +31,20 @@ public class FederationStateStoreException extends YarnException { */ private static final long serialVersionUID = -6453353714832159296L; - private FederationStateStoreErrorCode code; - - public FederationStateStoreException(FederationStateStoreErrorCode code) { + public FederationStateStoreException() { super(); - this.code = code; } - public FederationStateStoreErrorCode getCode() { - return code; + public FederationStateStoreException(String message) { + super(message); + } + + public FederationStateStoreException(Throwable cause) { + super(cause); + } + + public FederationStateStoreException(String message, Throwable cause) { + super(message, cause); } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java index 127bf82941..fbdb7bff90 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java @@ -18,21 +18,17 @@ package org.apache.hadoop.yarn.server.federation.store.impl; import java.util.ArrayList; +import java.util.Calendar; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.TimeZone; import java.util.concurrent.ConcurrentHashMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; -import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; -import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; -import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreErrorCode; -import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest; -import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterResponse; import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest; import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse; import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster; @@ -52,8 +48,13 @@ import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse; import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest; import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationResponse; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterResponse; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatResponse; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterResponse; import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest; @@ -98,16 +99,18 @@ public void close() { @Override public SubClusterRegisterResponse registerSubCluster( SubClusterRegisterRequest request) throws YarnException { - FederationMembershipStateStoreInputValidator - .validateSubClusterRegisterRequest(request); + FederationMembershipStateStoreInputValidator.validate(request); SubClusterInfo subClusterInfo = request.getSubClusterInfo(); + long currentTime = + Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis(); + SubClusterInfo subClusterInfoToSave = SubClusterInfo.newInstance(subClusterInfo.getSubClusterId(), subClusterInfo.getAMRMServiceAddress(), subClusterInfo.getClientRMServiceAddress(), subClusterInfo.getRMAdminServiceAddress(), - subClusterInfo.getRMWebServiceAddress(), clock.getTime(), + subClusterInfo.getRMWebServiceAddress(), currentTime, subClusterInfo.getState(), subClusterInfo.getLastStartTime(), subClusterInfo.getCapability()); @@ -118,15 +121,12 @@ public SubClusterRegisterResponse registerSubCluster( @Override public SubClusterDeregisterResponse deregisterSubCluster( SubClusterDeregisterRequest request) throws YarnException { - FederationMembershipStateStoreInputValidator - .validateSubClusterDeregisterRequest(request); + FederationMembershipStateStoreInputValidator.validate(request); SubClusterInfo subClusterInfo = membership.get(request.getSubClusterId()); if (subClusterInfo == null) { String errMsg = "SubCluster " + request.getSubClusterId().toString() + " not found"; - FederationStateStoreUtils.logAndThrowStoreException(LOG, - FederationStateStoreErrorCode.MEMBERSHIP_UPDATE_DEREGISTER_FAIL, - errMsg); + FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); } else { subClusterInfo.setState(request.getState()); } @@ -138,20 +138,20 @@ public SubClusterDeregisterResponse deregisterSubCluster( public SubClusterHeartbeatResponse subClusterHeartbeat( SubClusterHeartbeatRequest request) throws YarnException { - FederationMembershipStateStoreInputValidator - .validateSubClusterHeartbeatRequest(request); + FederationMembershipStateStoreInputValidator.validate(request); SubClusterId subClusterId = request.getSubClusterId(); SubClusterInfo subClusterInfo = membership.get(subClusterId); if (subClusterInfo == null) { - String errMsg = "Subcluster " + subClusterId.toString() + String errMsg = "SubCluster " + subClusterId.toString() + " does not exist; cannot heartbeat"; - FederationStateStoreUtils.logAndThrowStoreException(LOG, - FederationStateStoreErrorCode.MEMBERSHIP_UPDATE_HEARTBEAT_FAIL, - errMsg); + FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); } - subClusterInfo.setLastHeartBeat(clock.getTime()); + long currentTime = + Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis(); + + subClusterInfo.setLastHeartBeat(currentTime); subClusterInfo.setState(request.getState()); subClusterInfo.setCapability(request.getCapability()); @@ -162,14 +162,12 @@ public SubClusterHeartbeatResponse subClusterHeartbeat( public GetSubClusterInfoResponse getSubCluster( GetSubClusterInfoRequest request) throws YarnException { - FederationMembershipStateStoreInputValidator - .validateGetSubClusterInfoRequest(request); + FederationMembershipStateStoreInputValidator.validate(request); SubClusterId subClusterId = request.getSubClusterId(); if (!membership.containsKey(subClusterId)) { String errMsg = - "Subcluster " + subClusterId.toString() + " does not exist"; - FederationStateStoreUtils.logAndThrowStoreException(LOG, - FederationStateStoreErrorCode.MEMBERSHIP_SINGLE_SELECT_FAIL, errMsg); + "SubCluster " + subClusterId.toString() + " does not exist"; + FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); } return GetSubClusterInfoResponse.newInstance(membership.get(subClusterId)); @@ -195,8 +193,7 @@ public GetSubClustersInfoResponse getSubClusters( public AddApplicationHomeSubClusterResponse addApplicationHomeSubCluster( AddApplicationHomeSubClusterRequest request) throws YarnException { - FederationApplicationHomeSubClusterStoreInputValidator - .validateAddApplicationHomeSubClusterRequest(request); + FederationApplicationHomeSubClusterStoreInputValidator.validate(request); ApplicationId appId = request.getApplicationHomeSubCluster().getApplicationId(); @@ -213,14 +210,12 @@ public AddApplicationHomeSubClusterResponse addApplicationHomeSubCluster( public UpdateApplicationHomeSubClusterResponse updateApplicationHomeSubCluster( UpdateApplicationHomeSubClusterRequest request) throws YarnException { - FederationApplicationHomeSubClusterStoreInputValidator - .validateUpdateApplicationHomeSubClusterRequest(request); + FederationApplicationHomeSubClusterStoreInputValidator.validate(request); ApplicationId appId = request.getApplicationHomeSubCluster().getApplicationId(); if (!applications.containsKey(appId)) { String errMsg = "Application " + appId + " does not exist"; - FederationStateStoreUtils.logAndThrowStoreException(LOG, - FederationStateStoreErrorCode.APPLICATIONS_UPDATE_FAIL, errMsg); + FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); } applications.put(appId, @@ -232,14 +227,11 @@ public UpdateApplicationHomeSubClusterResponse updateApplicationHomeSubCluster( public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster( GetApplicationHomeSubClusterRequest request) throws YarnException { - FederationApplicationHomeSubClusterStoreInputValidator - .validateGetApplicationHomeSubClusterRequest(request); + FederationApplicationHomeSubClusterStoreInputValidator.validate(request); ApplicationId appId = request.getApplicationId(); if (!applications.containsKey(appId)) { String errMsg = "Application " + appId + " does not exist"; - FederationStateStoreUtils.logAndThrowStoreException(LOG, - FederationStateStoreErrorCode.APPLICATIONS_SINGLE_SELECT_FAIL, - errMsg); + FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); } return GetApplicationHomeSubClusterResponse.newInstance( @@ -264,13 +256,11 @@ public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster( public DeleteApplicationHomeSubClusterResponse deleteApplicationHomeSubCluster( DeleteApplicationHomeSubClusterRequest request) throws YarnException { - FederationApplicationHomeSubClusterStoreInputValidator - .validateDeleteApplicationHomeSubClusterRequest(request); + FederationApplicationHomeSubClusterStoreInputValidator.validate(request); ApplicationId appId = request.getApplicationId(); if (!applications.containsKey(appId)) { String errMsg = "Application " + appId + " does not exist"; - FederationStateStoreUtils.logAndThrowStoreException(LOG, - FederationStateStoreErrorCode.APPLICATIONS_DELETE_FAIL, errMsg); + FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); } applications.remove(appId); @@ -281,13 +271,11 @@ public DeleteApplicationHomeSubClusterResponse deleteApplicationHomeSubCluster( public GetSubClusterPolicyConfigurationResponse getPolicyConfiguration( GetSubClusterPolicyConfigurationRequest request) throws YarnException { - FederationPolicyStoreInputValidator - .validateGetSubClusterPolicyConfigurationRequest(request); + FederationPolicyStoreInputValidator.validate(request); String queue = request.getQueue(); if (!policies.containsKey(queue)) { String errMsg = "Policy for queue " + queue + " does not exist"; - FederationStateStoreUtils.logAndThrowStoreException(LOG, - FederationStateStoreErrorCode.POLICY_SINGLE_SELECT_FAIL, errMsg); + FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); } return GetSubClusterPolicyConfigurationResponse @@ -298,8 +286,7 @@ public GetSubClusterPolicyConfigurationResponse getPolicyConfiguration( public SetSubClusterPolicyConfigurationResponse setPolicyConfiguration( SetSubClusterPolicyConfigurationRequest request) throws YarnException { - FederationPolicyStoreInputValidator - .validateSetSubClusterPolicyConfigurationRequest(request); + FederationPolicyStoreInputValidator.validate(request); policies.put(request.getPolicyConfiguration().getQueue(), request.getPolicyConfiguration()); return SetSubClusterPolicyConfigurationResponse.newInstance(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java new file mode 100644 index 0000000000..a849c6a3d2 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java @@ -0,0 +1,937 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.federation.store.impl; + +import java.nio.ByteBuffer; +import java.sql.CallableStatement; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.List; +import java.util.TimeZone; + +import org.apache.commons.lang.NotImplementedException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; +import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreInvalidInputException; +import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse; +import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster; +import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterResponse; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterResponse; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoResponse; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse; +import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationResponse; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterResponse; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatResponse; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterResponse; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState; +import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterResponse; +import org.apache.hadoop.yarn.server.federation.store.utils.FederationApplicationHomeSubClusterStoreInputValidator; +import org.apache.hadoop.yarn.server.federation.store.utils.FederationMembershipStateStoreInputValidator; +import org.apache.hadoop.yarn.server.federation.store.utils.FederationPolicyStoreInputValidator; +import org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreUtils; +import org.apache.hadoop.yarn.server.records.Version; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.zaxxer.hikari.HikariDataSource; + +/** + * SQL implementation of {@link FederationStateStore}. + */ +public class SQLFederationStateStore implements FederationStateStore { + + public static final Logger LOG = + LoggerFactory.getLogger(SQLFederationStateStore.class); + + // Stored procedures patterns + + private static final String CALL_SP_REGISTER_SUBCLUSTER = + "{call sp_registerSubCluster(?, ?, ?, ?, ?, ?, ?, ?, ?)}"; + + private static final String CALL_SP_DEREGISTER_SUBCLUSTER = + "{call sp_deregisterSubCluster(?, ?, ?)}"; + + private static final String CALL_SP_GET_SUBCLUSTER = + "{call sp_getSubCluster(?, ?, ?, ?, ?, ?, ?, ?, ?)}"; + + private static final String CALL_SP_GET_SUBCLUSTERS = + "{call sp_getSubClusters()}"; + + private static final String CALL_SP_SUBCLUSTER_HEARTBEAT = + "{call sp_subClusterHeartbeat(?, ?, ?, ?)}"; + + private static final String CALL_SP_ADD_APPLICATION_HOME_SUBCLUSTER = + "{call sp_addApplicationHomeSubCluster(?, ?, ?, ?)}"; + + private static final String CALL_SP_UPDATE_APPLICATION_HOME_SUBCLUSTER = + "{call sp_updateApplicationHomeSubCluster(?, ?, ?)}"; + + private static final String CALL_SP_DELETE_APPLICATION_HOME_SUBCLUSTER = + "{call sp_deleteApplicationHomeSubCluster(?, ?)}"; + + private static final String CALL_SP_GET_APPLICATION_HOME_SUBCLUSTER = + "{call sp_getApplicationHomeSubCluster(?, ?)}"; + + private static final String CALL_SP_GET_APPLICATIONS_HOME_SUBCLUSTER = + "{call sp_getApplicationsHomeSubCluster()}"; + + private static final String CALL_SP_SET_POLICY_CONFIGURATION = + "{call sp_setPolicyConfiguration(?, ?, ?, ?)}"; + + private static final String CALL_SP_GET_POLICY_CONFIGURATION = + "{call sp_getPolicyConfiguration(?, ?, ?)}"; + + private static final String CALL_SP_GET_POLICIES_CONFIGURATIONS = + "{call sp_getPoliciesConfigurations()}"; + + private Calendar utcCalendar = + Calendar.getInstance(TimeZone.getTimeZone("UTC")); + + // SQL database configurations + + private String userName; + private String password; + private String driverClass; + private String url; + private int maximumPoolSize; + private HikariDataSource dataSource = null; + + @Override + public void init(Configuration conf) throws YarnException { + driverClass = + conf.get(YarnConfiguration.FEDERATION_STATESTORE_SQL_JDBC_CLASS, + YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_SQL_JDBC_CLASS); + maximumPoolSize = + conf.getInt(YarnConfiguration.FEDERATION_STATESTORE_SQL_MAXCONNECTIONS, + YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_SQL_MAXCONNECTIONS); + + // An helper method avoids to assign a null value to these property + userName = conf.get(YarnConfiguration.FEDERATION_STATESTORE_SQL_USERNAME); + password = conf.get(YarnConfiguration.FEDERATION_STATESTORE_SQL_PASSWORD); + url = conf.get(YarnConfiguration.FEDERATION_STATESTORE_SQL_URL); + + try { + Class.forName(driverClass); + } catch (ClassNotFoundException e) { + FederationStateStoreUtils.logAndThrowException(LOG, + "Driver class not found.", e); + } + + // Create the data source to pool connections in a thread-safe manner + dataSource = new HikariDataSource(); + dataSource.setDataSourceClassName(driverClass); + FederationStateStoreUtils.setUsername(dataSource, userName); + FederationStateStoreUtils.setPassword(dataSource, password); + FederationStateStoreUtils.setProperty(dataSource, + FederationStateStoreUtils.FEDERATION_STORE_URL, url); + dataSource.setMaximumPoolSize(maximumPoolSize); + LOG.info("Initialized connection pool to the Federation StateStore " + + "database at address: " + url); + } + + @Override + public SubClusterRegisterResponse registerSubCluster( + SubClusterRegisterRequest registerSubClusterRequest) + throws YarnException { + + // Input validator + FederationMembershipStateStoreInputValidator + .validate(registerSubClusterRequest); + + CallableStatement cstmt = null; + Connection conn = null; + + SubClusterInfo subClusterInfo = + registerSubClusterRequest.getSubClusterInfo(); + SubClusterId subClusterId = subClusterInfo.getSubClusterId(); + + try { + conn = getConnection(); + cstmt = conn.prepareCall(CALL_SP_REGISTER_SUBCLUSTER); + + // Set the parameters for the stored procedure + cstmt.setString(1, subClusterId.getId()); + cstmt.setString(2, subClusterInfo.getAMRMServiceAddress()); + cstmt.setString(3, subClusterInfo.getClientRMServiceAddress()); + cstmt.setString(4, subClusterInfo.getRMAdminServiceAddress()); + cstmt.setString(5, subClusterInfo.getRMWebServiceAddress()); + cstmt.setString(6, subClusterInfo.getState().toString()); + cstmt.setLong(7, subClusterInfo.getLastStartTime()); + cstmt.setString(8, subClusterInfo.getCapability()); + cstmt.registerOutParameter(9, java.sql.Types.INTEGER); + + // Execute the query + cstmt.executeUpdate(); + + // Check the ROWCOUNT value, if it is equal to 0 it means the call + // did not add a new subcluster into FederationStateStore + if (cstmt.getInt(9) == 0) { + String errMsg = "SubCluster " + subClusterId + + " was not registered into the StateStore"; + FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + } + // Check the ROWCOUNT value, if it is different from 1 it means the call + // had a wrong behavior. Maybe the database is not set correctly. + if (cstmt.getInt(9) != 1) { + String errMsg = "Wrong behavior during registration of SubCluster " + + subClusterId + " into the StateStore"; + FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + } + + LOG.info( + "Registered the SubCluster " + subClusterId + " into the StateStore"); + + } catch (SQLException e) { + FederationStateStoreUtils.logAndThrowRetriableException(LOG, + "Unable to register the SubCluster " + subClusterId + + " into the StateStore", + e); + } finally { + // Return to the pool the CallableStatement and the Connection + FederationStateStoreUtils.returnToPool(LOG, cstmt, conn); + } + return SubClusterRegisterResponse.newInstance(); + } + + @Override + public SubClusterDeregisterResponse deregisterSubCluster( + SubClusterDeregisterRequest subClusterDeregisterRequest) + throws YarnException { + + // Input validator + FederationMembershipStateStoreInputValidator + .validate(subClusterDeregisterRequest); + + CallableStatement cstmt = null; + Connection conn = null; + + SubClusterId subClusterId = subClusterDeregisterRequest.getSubClusterId(); + SubClusterState state = subClusterDeregisterRequest.getState(); + + try { + conn = getConnection(); + cstmt = conn.prepareCall(CALL_SP_DEREGISTER_SUBCLUSTER); + + // Set the parameters for the stored procedure + cstmt.setString(1, subClusterId.getId()); + cstmt.setString(2, state.toString()); + cstmt.registerOutParameter(3, java.sql.Types.INTEGER); + + // Execute the query + cstmt.executeUpdate(); + + // Check the ROWCOUNT value, if it is equal to 0 it means the call + // did not deregister the subcluster into FederationStateStore + if (cstmt.getInt(3) == 0) { + String errMsg = "SubCluster " + subClusterId + " not found"; + FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + } + // Check the ROWCOUNT value, if it is different from 1 it means the call + // had a wrong behavior. Maybe the database is not set correctly. + if (cstmt.getInt(3) != 1) { + String errMsg = "Wrong behavior during deregistration of SubCluster " + + subClusterId + " from the StateStore"; + FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + } + + LOG.info("Deregistered the SubCluster " + subClusterId + " state to " + + state.toString()); + + } catch (SQLException e) { + FederationStateStoreUtils.logAndThrowRetriableException(LOG, + "Unable to deregister the sub-cluster " + subClusterId + " state to " + + state.toString(), + e); + } finally { + // Return to the pool the CallableStatement and the Connection + FederationStateStoreUtils.returnToPool(LOG, cstmt, conn); + } + return SubClusterDeregisterResponse.newInstance(); + } + + @Override + public SubClusterHeartbeatResponse subClusterHeartbeat( + SubClusterHeartbeatRequest subClusterHeartbeatRequest) + throws YarnException { + + // Input validator + FederationMembershipStateStoreInputValidator + .validate(subClusterHeartbeatRequest); + + CallableStatement cstmt = null; + Connection conn = null; + + SubClusterId subClusterId = subClusterHeartbeatRequest.getSubClusterId(); + SubClusterState state = subClusterHeartbeatRequest.getState(); + + try { + conn = getConnection(); + cstmt = conn.prepareCall(CALL_SP_SUBCLUSTER_HEARTBEAT); + + // Set the parameters for the stored procedure + cstmt.setString(1, subClusterId.getId()); + cstmt.setString(2, state.toString()); + cstmt.setString(3, subClusterHeartbeatRequest.getCapability()); + cstmt.registerOutParameter(4, java.sql.Types.INTEGER); + + // Execute the query + cstmt.executeUpdate(); + + // Check the ROWCOUNT value, if it is equal to 0 it means the call + // did not update the subcluster into FederationStateStore + if (cstmt.getInt(4) == 0) { + String errMsg = "SubCluster " + subClusterId.toString() + + " does not exist; cannot heartbeat"; + FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + } + // Check the ROWCOUNT value, if it is different from 1 it means the call + // had a wrong behavior. Maybe the database is not set correctly. + if (cstmt.getInt(4) != 1) { + String errMsg = + "Wrong behavior during the heartbeat of SubCluster " + subClusterId; + FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + } + + LOG.info("Heartbeated the StateStore for the specified SubCluster " + + subClusterId); + + } catch (SQLException e) { + FederationStateStoreUtils.logAndThrowRetriableException(LOG, + "Unable to heartbeat the StateStore for the specified SubCluster " + + subClusterId, + e); + } finally { + // Return to the pool the CallableStatement and the Connection + FederationStateStoreUtils.returnToPool(LOG, cstmt, conn); + } + return SubClusterHeartbeatResponse.newInstance(); + } + + @Override + public GetSubClusterInfoResponse getSubCluster( + GetSubClusterInfoRequest subClusterRequest) throws YarnException { + + // Input validator + FederationMembershipStateStoreInputValidator.validate(subClusterRequest); + + CallableStatement cstmt = null; + Connection conn = null; + + SubClusterInfo subClusterInfo = null; + SubClusterId subClusterId = subClusterRequest.getSubClusterId(); + + try { + conn = getConnection(); + cstmt = conn.prepareCall(CALL_SP_GET_SUBCLUSTER); + cstmt.setString(1, subClusterId.getId()); + + // Set the parameters for the stored procedure + cstmt.registerOutParameter(2, java.sql.Types.VARCHAR); + cstmt.registerOutParameter(3, java.sql.Types.VARCHAR); + cstmt.registerOutParameter(4, java.sql.Types.VARCHAR); + cstmt.registerOutParameter(5, java.sql.Types.VARCHAR); + cstmt.registerOutParameter(6, java.sql.Types.TIMESTAMP); + cstmt.registerOutParameter(7, java.sql.Types.VARCHAR); + cstmt.registerOutParameter(8, java.sql.Types.BIGINT); + cstmt.registerOutParameter(9, java.sql.Types.VARCHAR); + + // Execute the query + cstmt.execute(); + + String amRMAddress = cstmt.getString(2); + String clientRMAddress = cstmt.getString(3); + String rmAdminAddress = cstmt.getString(4); + String webAppAddress = cstmt.getString(5); + + Timestamp heartBeatTimeStamp = cstmt.getTimestamp(6, utcCalendar); + long lastHeartBeat = + heartBeatTimeStamp != null ? heartBeatTimeStamp.getTime() : 0; + + SubClusterState state = SubClusterState.fromString(cstmt.getString(7)); + long lastStartTime = cstmt.getLong(8); + String capability = cstmt.getString(9); + + subClusterInfo = SubClusterInfo.newInstance(subClusterId, amRMAddress, + clientRMAddress, rmAdminAddress, webAppAddress, lastHeartBeat, state, + lastStartTime, capability); + + // Check if the output it is a valid subcluster + try { + FederationMembershipStateStoreInputValidator + .checkSubClusterInfo(subClusterInfo); + } catch (FederationStateStoreInvalidInputException e) { + String errMsg = + "SubCluster " + subClusterId.toString() + " does not exist"; + FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Got the information about the specified SubCluster " + + subClusterInfo.toString()); + } + } catch (SQLException e) { + FederationStateStoreUtils.logAndThrowRetriableException(LOG, + "Unable to obtain the SubCluster information for " + subClusterId, e); + } finally { + // Return to the pool the CallableStatement and the Connection + FederationStateStoreUtils.returnToPool(LOG, cstmt, conn); + } + return GetSubClusterInfoResponse.newInstance(subClusterInfo); + } + + @Override + public GetSubClustersInfoResponse getSubClusters( + GetSubClustersInfoRequest subClustersRequest) throws YarnException { + CallableStatement cstmt = null; + Connection conn = null; + ResultSet rs = null; + List subClusters = new ArrayList(); + + try { + conn = getConnection(); + cstmt = conn.prepareCall(CALL_SP_GET_SUBCLUSTERS); + + // Execute the query + rs = cstmt.executeQuery(); + + while (rs.next()) { + + // Extract the output for each tuple + String subClusterName = rs.getString(1); + String amRMAddress = rs.getString(2); + String clientRMAddress = rs.getString(3); + String rmAdminAddress = rs.getString(4); + String webAppAddress = rs.getString(5); + long lastHeartBeat = rs.getTimestamp(6, utcCalendar).getTime(); + SubClusterState state = SubClusterState.fromString(rs.getString(7)); + long lastStartTime = rs.getLong(8); + String capability = rs.getString(9); + + SubClusterId subClusterId = SubClusterId.newInstance(subClusterName); + SubClusterInfo subClusterInfo = SubClusterInfo.newInstance(subClusterId, + amRMAddress, clientRMAddress, rmAdminAddress, webAppAddress, + lastHeartBeat, state, lastStartTime, capability); + + // Check if the output it is a valid subcluster + try { + FederationMembershipStateStoreInputValidator + .checkSubClusterInfo(subClusterInfo); + } catch (FederationStateStoreInvalidInputException e) { + String errMsg = + "SubCluster " + subClusterId.toString() + " is not valid"; + FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + } + + // Filter the inactive + if (!subClustersRequest.getFilterInactiveSubClusters() + || subClusterInfo.getState().isActive()) { + subClusters.add(subClusterInfo); + } + } + + } catch (SQLException e) { + FederationStateStoreUtils.logAndThrowRetriableException(LOG, + "Unable to obtain the information for all the SubClusters ", e); + } finally { + // Return to the pool the CallableStatement and the Connection + FederationStateStoreUtils.returnToPool(LOG, cstmt, conn, rs); + } + return GetSubClustersInfoResponse.newInstance(subClusters); + } + + @Override + public AddApplicationHomeSubClusterResponse addApplicationHomeSubCluster( + AddApplicationHomeSubClusterRequest request) throws YarnException { + + // Input validator + FederationApplicationHomeSubClusterStoreInputValidator.validate(request); + + CallableStatement cstmt = null; + Connection conn = null; + + String subClusterHome = null; + ApplicationId appId = + request.getApplicationHomeSubCluster().getApplicationId(); + SubClusterId subClusterId = + request.getApplicationHomeSubCluster().getHomeSubCluster(); + + try { + conn = getConnection(); + cstmt = conn.prepareCall(CALL_SP_ADD_APPLICATION_HOME_SUBCLUSTER); + + // Set the parameters for the stored procedure + cstmt.setString(1, appId.toString()); + cstmt.setString(2, subClusterId.getId()); + cstmt.registerOutParameter(3, java.sql.Types.VARCHAR); + cstmt.registerOutParameter(4, java.sql.Types.INTEGER); + + // Execute the query + cstmt.executeUpdate(); + + subClusterHome = cstmt.getString(3); + SubClusterId subClusterIdHome = SubClusterId.newInstance(subClusterHome); + + // For failover reason, we check the returned SubClusterId. + // If it is equal to the subclusterId we sent, the call added the new + // application into FederationStateStore. If the call returns a different + // SubClusterId it means we already tried to insert this application but a + // component (Router/StateStore/RM) failed during the submission. + if (subClusterId.equals(subClusterIdHome)) { + // Check the ROWCOUNT value, if it is equal to 0 it means the call + // did not add a new application into FederationStateStore + if (cstmt.getInt(4) == 0) { + String errMsg = "The application " + appId + + " was not insert into the StateStore"; + FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + } + // Check the ROWCOUNT value, if it is different from 1 it means the call + // had a wrong behavior. Maybe the database is not set correctly. + if (cstmt.getInt(4) != 1) { + String errMsg = "Wrong behavior during the insertion of SubCluster " + + subClusterId; + FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + } + + LOG.info("Insert into the StateStore the application: " + appId + + " in SubCluster: " + subClusterHome); + } else { + // Check the ROWCOUNT value, if it is different from 0 it means the call + // did edited the table + if (cstmt.getInt(4) != 0) { + String errMsg = + "The application " + appId + " does exist but was overwritten"; + FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + } + LOG.info("Application: " + appId + " already present with SubCluster: " + + subClusterHome); + } + + } catch (SQLException e) { + FederationStateStoreUtils + .logAndThrowRetriableException(LOG, + "Unable to insert the newly generated application " + + request.getApplicationHomeSubCluster().getApplicationId(), + e); + } finally { + // Return to the pool the CallableStatement and the Connection + FederationStateStoreUtils.returnToPool(LOG, cstmt, conn); + } + return AddApplicationHomeSubClusterResponse + .newInstance(SubClusterId.newInstance(subClusterHome)); + } + + @Override + public UpdateApplicationHomeSubClusterResponse updateApplicationHomeSubCluster( + UpdateApplicationHomeSubClusterRequest request) throws YarnException { + + // Input validator + FederationApplicationHomeSubClusterStoreInputValidator.validate(request); + + CallableStatement cstmt = null; + Connection conn = null; + + ApplicationId appId = + request.getApplicationHomeSubCluster().getApplicationId(); + SubClusterId subClusterId = + request.getApplicationHomeSubCluster().getHomeSubCluster(); + + try { + conn = getConnection(); + cstmt = conn.prepareCall(CALL_SP_UPDATE_APPLICATION_HOME_SUBCLUSTER); + + // Set the parameters for the stored procedure + cstmt.setString(1, appId.toString()); + cstmt.setString(2, subClusterId.getId()); + cstmt.registerOutParameter(3, java.sql.Types.INTEGER); + + // Execute the query + cstmt.executeUpdate(); + + // Check the ROWCOUNT value, if it is equal to 0 it means the call + // did not update the application into FederationStateStore + if (cstmt.getInt(3) == 0) { + String errMsg = "Application " + appId + " does not exist"; + FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + } + // Check the ROWCOUNT value, if it is different from 1 it means the call + // had a wrong behavior. Maybe the database is not set correctly. + if (cstmt.getInt(3) != 1) { + String errMsg = + "Wrong behavior during the update of SubCluster " + subClusterId; + FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + } + + LOG.info( + "Update the SubCluster to {} for application {} in the StateStore", + subClusterId, appId); + + } catch (SQLException e) { + FederationStateStoreUtils + .logAndThrowRetriableException(LOG, + "Unable to update the application " + + request.getApplicationHomeSubCluster().getApplicationId(), + e); + } finally { + // Return to the pool the CallableStatement and the Connection + FederationStateStoreUtils.returnToPool(LOG, cstmt, conn); + } + return UpdateApplicationHomeSubClusterResponse.newInstance(); + } + + @Override + public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster( + GetApplicationHomeSubClusterRequest request) throws YarnException { + // Input validator + FederationApplicationHomeSubClusterStoreInputValidator.validate(request); + + CallableStatement cstmt = null; + Connection conn = null; + + SubClusterId homeRM = null; + + try { + conn = getConnection(); + cstmt = conn.prepareCall(CALL_SP_GET_APPLICATION_HOME_SUBCLUSTER); + + // Set the parameters for the stored procedure + cstmt.setString(1, request.getApplicationId().toString()); + cstmt.registerOutParameter(2, java.sql.Types.VARCHAR); + + // Execute the query + cstmt.execute(); + + if (cstmt.getString(2) != null) { + homeRM = SubClusterId.newInstance(cstmt.getString(2)); + } else { + String errMsg = + "Application " + request.getApplicationId() + " does not exist"; + FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Got the information about the specified application " + + request.getApplicationId() + ". The AM is running in " + homeRM); + } + } catch (SQLException e) { + FederationStateStoreUtils.logAndThrowRetriableException(LOG, + "Unable to obtain the application information " + + "for the specified application " + request.getApplicationId(), + e); + } finally { + + // Return to the pool the CallableStatement and the Connection + FederationStateStoreUtils.returnToPool(LOG, cstmt, conn); + } + return GetApplicationHomeSubClusterResponse + .newInstance(ApplicationHomeSubCluster + .newInstance(request.getApplicationId(), homeRM)); + } + + @Override + public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster( + GetApplicationsHomeSubClusterRequest request) throws YarnException { + CallableStatement cstmt = null; + Connection conn = null; + ResultSet rs = null; + List appsHomeSubClusters = + new ArrayList(); + + try { + conn = getConnection(); + cstmt = conn.prepareCall(CALL_SP_GET_APPLICATIONS_HOME_SUBCLUSTER); + + // Execute the query + rs = cstmt.executeQuery(); + + while (rs.next()) { + + // Extract the output for each tuple + String applicationId = rs.getString(1); + String homeSubCluster = rs.getString(2); + + appsHomeSubClusters.add(ApplicationHomeSubCluster.newInstance( + ApplicationId.fromString(applicationId), + SubClusterId.newInstance(homeSubCluster))); + } + + } catch (SQLException e) { + FederationStateStoreUtils.logAndThrowRetriableException(LOG, + "Unable to obtain the information for all the applications ", e); + } finally { + // Return to the pool the CallableStatement and the Connection + FederationStateStoreUtils.returnToPool(LOG, cstmt, conn, rs); + } + return GetApplicationsHomeSubClusterResponse + .newInstance(appsHomeSubClusters); + } + + @Override + public DeleteApplicationHomeSubClusterResponse deleteApplicationHomeSubCluster( + DeleteApplicationHomeSubClusterRequest request) throws YarnException { + + // Input validator + FederationApplicationHomeSubClusterStoreInputValidator.validate(request); + + CallableStatement cstmt = null; + Connection conn = null; + + try { + conn = getConnection(); + cstmt = conn.prepareCall(CALL_SP_DELETE_APPLICATION_HOME_SUBCLUSTER); + + // Set the parameters for the stored procedure + cstmt.setString(1, request.getApplicationId().toString()); + cstmt.registerOutParameter(2, java.sql.Types.INTEGER); + + // Execute the query + cstmt.executeUpdate(); + + // Check the ROWCOUNT value, if it is equal to 0 it means the call + // did not delete the application from FederationStateStore + if (cstmt.getInt(2) == 0) { + String errMsg = + "Application " + request.getApplicationId() + " does not exist"; + FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + } + // Check the ROWCOUNT value, if it is different from 1 it means the call + // had a wrong behavior. Maybe the database is not set correctly. + if (cstmt.getInt(2) != 1) { + String errMsg = "Wrong behavior during deleting the application " + + request.getApplicationId(); + FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + } + + LOG.info("Delete from the StateStore the application: {}", + request.getApplicationId()); + + } catch (SQLException e) { + FederationStateStoreUtils.logAndThrowRetriableException(LOG, + "Unable to delete the application " + request.getApplicationId(), e); + } finally { + // Return to the pool the CallableStatement and the Connection + FederationStateStoreUtils.returnToPool(LOG, cstmt, conn); + } + return DeleteApplicationHomeSubClusterResponse.newInstance(); + } + + @Override + public GetSubClusterPolicyConfigurationResponse getPolicyConfiguration( + GetSubClusterPolicyConfigurationRequest request) throws YarnException { + + // Input validator + FederationPolicyStoreInputValidator.validate(request); + + CallableStatement cstmt = null; + Connection conn = null; + SubClusterPolicyConfiguration subClusterPolicyConfiguration = null; + + try { + conn = getConnection(); + cstmt = conn.prepareCall(CALL_SP_GET_POLICY_CONFIGURATION); + + // Set the parameters for the stored procedure + cstmt.setString(1, request.getQueue()); + cstmt.registerOutParameter(2, java.sql.Types.VARCHAR); + cstmt.registerOutParameter(3, java.sql.Types.VARBINARY); + + // Execute the query + cstmt.executeUpdate(); + + // Check if the output it is a valid policy + if (cstmt.getString(2) != null && cstmt.getBytes(3) != null) { + subClusterPolicyConfiguration = + SubClusterPolicyConfiguration.newInstance(request.getQueue(), + cstmt.getString(2), ByteBuffer.wrap(cstmt.getBytes(3))); + if (LOG.isDebugEnabled()) { + LOG.debug("Selected from StateStore the policy for the queue: " + + subClusterPolicyConfiguration.toString()); + } + } else { + String errMsg = + "Policy for queue " + request.getQueue() + " does not exist"; + FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + } + + } catch (SQLException e) { + FederationStateStoreUtils.logAndThrowRetriableException(LOG, + "Unable to select the policy for the queue :" + request.getQueue(), + e); + } finally { + // Return to the pool the CallableStatement and the Connection + FederationStateStoreUtils.returnToPool(LOG, cstmt, conn); + } + return GetSubClusterPolicyConfigurationResponse + .newInstance(subClusterPolicyConfiguration); + } + + @Override + public SetSubClusterPolicyConfigurationResponse setPolicyConfiguration( + SetSubClusterPolicyConfigurationRequest request) throws YarnException { + + // Input validator + FederationPolicyStoreInputValidator.validate(request); + + CallableStatement cstmt = null; + Connection conn = null; + + SubClusterPolicyConfiguration policyConf = request.getPolicyConfiguration(); + + try { + conn = getConnection(); + cstmt = conn.prepareCall(CALL_SP_SET_POLICY_CONFIGURATION); + + // Set the parameters for the stored procedure + cstmt.setString(1, policyConf.getQueue()); + cstmt.setString(2, policyConf.getType()); + cstmt.setBytes(3, getByteArray(policyConf.getParams())); + cstmt.registerOutParameter(4, java.sql.Types.INTEGER); + + // Execute the query + cstmt.executeUpdate(); + + // Check the ROWCOUNT value, if it is equal to 0 it means the call + // did not add a new policy into FederationStateStore + if (cstmt.getInt(4) == 0) { + String errMsg = "The policy " + policyConf.getQueue() + + " was not insert into the StateStore"; + FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + } + // Check the ROWCOUNT value, if it is different from 1 it means the call + // had a wrong behavior. Maybe the database is not set correctly. + if (cstmt.getInt(4) != 1) { + String errMsg = + "Wrong behavior during insert the policy " + policyConf.getQueue(); + FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + } + + LOG.info("Insert into the state store the policy for the queue: " + + policyConf.getQueue()); + + } catch (SQLException e) { + FederationStateStoreUtils.logAndThrowRetriableException(LOG, + "Unable to insert the newly generated policy for the queue :" + + policyConf.getQueue(), + e); + } finally { + // Return to the pool the CallableStatement and the Connection + FederationStateStoreUtils.returnToPool(LOG, cstmt, conn); + } + return SetSubClusterPolicyConfigurationResponse.newInstance(); + } + + @Override + public GetSubClusterPoliciesConfigurationsResponse getPoliciesConfigurations( + GetSubClusterPoliciesConfigurationsRequest request) throws YarnException { + + CallableStatement cstmt = null; + Connection conn = null; + ResultSet rs = null; + List policyConfigurations = + new ArrayList(); + + try { + conn = getConnection(); + cstmt = conn.prepareCall(CALL_SP_GET_POLICIES_CONFIGURATIONS); + + // Execute the query + rs = cstmt.executeQuery(); + + while (rs.next()) { + + // Extract the output for each tuple + String queue = rs.getString(1); + String type = rs.getString(2); + byte[] policyInfo = rs.getBytes(3); + + SubClusterPolicyConfiguration subClusterPolicyConfiguration = + SubClusterPolicyConfiguration.newInstance(queue, type, + ByteBuffer.wrap(policyInfo)); + policyConfigurations.add(subClusterPolicyConfiguration); + } + } catch (SQLException e) { + FederationStateStoreUtils.logAndThrowRetriableException(LOG, + "Unable to obtain the policy information for all the queues.", e); + } finally { + // Return to the pool the CallableStatement and the Connection + FederationStateStoreUtils.returnToPool(LOG, cstmt, conn, rs); + } + + return GetSubClusterPoliciesConfigurationsResponse + .newInstance(policyConfigurations); + } + + @Override + public Version getCurrentVersion() { + throw new NotImplementedException(); + } + + @Override + public Version loadVersion() { + throw new NotImplementedException(); + } + + @Override + public void close() throws Exception { + if (dataSource != null) { + dataSource.close(); + } + } + + /** + * Get a connection from the DataSource pool. + * + * @return a connection from the DataSource pool. + * @throws SQLException on failure + */ + public Connection getConnection() throws SQLException { + return dataSource.getConnection(); + } + + private static byte[] getByteArray(ByteBuffer bb) { + byte[] ba = new byte[bb.limit()]; + bb.get(ba); + return ba; + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterState.java index ff49aaacea..b30bd32fd0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterState.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterState.java @@ -19,6 +19,8 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** *

@@ -61,4 +63,23 @@ public boolean isFinal() { return (this == SC_UNREGISTERED || this == SC_DECOMMISSIONED || this == SC_LOST); } + + public static final Logger LOG = + LoggerFactory.getLogger(SubClusterState.class); + + /** + * Convert a string into {@code SubClusterState}. + * + * @param x the string to convert in SubClusterState + * @return the respective {@code SubClusterState} + */ + public static SubClusterState fromString(String x) { + try { + return SubClusterState.valueOf(x); + } catch (Exception e) { + LOG.error("Invalid SubCluster State value in the StateStore does not" + + " match with the YARN Federation standard."); + return null; + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationApplicationHomeSubClusterStoreInputValidator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationApplicationHomeSubClusterStoreInputValidator.java index d920144e7a..0184c9fad6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationApplicationHomeSubClusterStoreInputValidator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationApplicationHomeSubClusterStoreInputValidator.java @@ -51,8 +51,7 @@ private FederationApplicationHomeSubClusterStoreInputValidator() { * against * @throws FederationStateStoreInvalidInputException if the request is invalid */ - public static void validateAddApplicationHomeSubClusterRequest( - AddApplicationHomeSubClusterRequest request) + public static void validate(AddApplicationHomeSubClusterRequest request) throws FederationStateStoreInvalidInputException { if (request == null) { String message = "Missing AddApplicationHomeSubCluster Request." @@ -75,8 +74,7 @@ public static void validateAddApplicationHomeSubClusterRequest( * validate against * @throws FederationStateStoreInvalidInputException if the request is invalid */ - public static void validateUpdateApplicationHomeSubClusterRequest( - UpdateApplicationHomeSubClusterRequest request) + public static void validate(UpdateApplicationHomeSubClusterRequest request) throws FederationStateStoreInvalidInputException { if (request == null) { String message = "Missing UpdateApplicationHomeSubCluster Request." @@ -99,8 +97,7 @@ public static void validateUpdateApplicationHomeSubClusterRequest( * against * @throws FederationStateStoreInvalidInputException if the request is invalid */ - public static void validateGetApplicationHomeSubClusterRequest( - GetApplicationHomeSubClusterRequest request) + public static void validate(GetApplicationHomeSubClusterRequest request) throws FederationStateStoreInvalidInputException { if (request == null) { String message = "Missing GetApplicationHomeSubCluster Request." @@ -122,8 +119,7 @@ public static void validateGetApplicationHomeSubClusterRequest( * validate against * @throws FederationStateStoreInvalidInputException if the request is invalid */ - public static void validateDeleteApplicationHomeSubClusterRequest( - DeleteApplicationHomeSubClusterRequest request) + public static void validate(DeleteApplicationHomeSubClusterRequest request) throws FederationStateStoreInvalidInputException { if (request == null) { String message = "Missing DeleteApplicationHomeSubCluster Request." diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationMembershipStateStoreInputValidator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationMembershipStateStoreInputValidator.java index ebe622b96b..0ec8e5de6f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationMembershipStateStoreInputValidator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationMembershipStateStoreInputValidator.java @@ -53,8 +53,7 @@ private FederationMembershipStateStoreInputValidator() { * @param request the {@link SubClusterRegisterRequest} to validate against * @throws FederationStateStoreInvalidInputException if the request is invalid */ - public static void validateSubClusterRegisterRequest( - SubClusterRegisterRequest request) + public static void validate(SubClusterRegisterRequest request) throws FederationStateStoreInvalidInputException { // check if the request is present @@ -79,8 +78,7 @@ public static void validateSubClusterRegisterRequest( * @param request the {@link SubClusterDeregisterRequest} to validate against * @throws FederationStateStoreInvalidInputException if the request is invalid */ - public static void validateSubClusterDeregisterRequest( - SubClusterDeregisterRequest request) + public static void validate(SubClusterDeregisterRequest request) throws FederationStateStoreInvalidInputException { // check if the request is present @@ -111,8 +109,7 @@ public static void validateSubClusterDeregisterRequest( * @param request the {@link SubClusterHeartbeatRequest} to validate against * @throws FederationStateStoreInvalidInputException if the request is invalid */ - public static void validateSubClusterHeartbeatRequest( - SubClusterHeartbeatRequest request) + public static void validate(SubClusterHeartbeatRequest request) throws FederationStateStoreInvalidInputException { // check if the request is present @@ -143,8 +140,7 @@ public static void validateSubClusterHeartbeatRequest( * @param request the {@link GetSubClusterInfoRequest} to validate against * @throws FederationStateStoreInvalidInputException if the request is invalid */ - public static void validateGetSubClusterInfoRequest( - GetSubClusterInfoRequest request) + public static void validate(GetSubClusterInfoRequest request) throws FederationStateStoreInvalidInputException { // check if the request is present @@ -169,7 +165,7 @@ public static void validateGetSubClusterInfoRequest( * @throws FederationStateStoreInvalidInputException if the SubCluster Info * are invalid */ - private static void checkSubClusterInfo(SubClusterInfo subClusterInfo) + public static void checkSubClusterInfo(SubClusterInfo subClusterInfo) throws FederationStateStoreInvalidInputException { if (subClusterInfo == null) { String message = "Missing SubCluster Information." diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationPolicyStoreInputValidator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationPolicyStoreInputValidator.java index 0df2d85856..3c68bfdace 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationPolicyStoreInputValidator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationPolicyStoreInputValidator.java @@ -48,8 +48,7 @@ private FederationPolicyStoreInputValidator() { * validate against * @throws FederationStateStoreInvalidInputException if the request is invalid */ - public static void validateGetSubClusterPolicyConfigurationRequest( - GetSubClusterPolicyConfigurationRequest request) + public static void validate(GetSubClusterPolicyConfigurationRequest request) throws FederationStateStoreInvalidInputException { if (request == null) { String message = "Missing GetSubClusterPolicyConfiguration Request." @@ -72,8 +71,7 @@ public static void validateGetSubClusterPolicyConfigurationRequest( * validate against * @throws FederationStateStoreInvalidInputException if the request is invalid */ - public static void validateSetSubClusterPolicyConfigurationRequest( - SetSubClusterPolicyConfigurationRequest request) + public static void validate(SetSubClusterPolicyConfigurationRequest request) throws FederationStateStoreInvalidInputException { if (request == null) { String message = "Missing SetSubClusterPolicyConfiguration Request." diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreUtils.java index 7dbb20aaf0..3b870debef 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreUtils.java @@ -20,16 +20,18 @@ import java.sql.CallableStatement; import java.sql.Connection; +import java.sql.ResultSet; import java.sql.SQLException; import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreErrorCode; import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreException; import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreInvalidInputException; import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreRetriableException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.zaxxer.hikari.HikariDataSource; + /** * Common utility methods used by the store implementations. * @@ -39,19 +41,22 @@ public final class FederationStateStoreUtils { public static final Logger LOG = LoggerFactory.getLogger(FederationStateStoreUtils.class); + public final static String FEDERATION_STORE_URL = "url"; + private FederationStateStoreUtils() { } /** - * Returns the SQL FederationStateStore connection to the pool. + * Returns the SQL FederationStateStore connections to the pool. * * @param log the logger interface * @param cstmt the interface used to execute SQL stored procedures * @param conn the SQL connection + * @param rs the ResultSet interface used to execute SQL stored procedures * @throws YarnException on failure */ public static void returnToPool(Logger log, CallableStatement cstmt, - Connection conn) throws YarnException { + Connection conn, ResultSet rs) throws YarnException { if (cstmt != null) { try { cstmt.close(); @@ -69,6 +74,28 @@ public static void returnToPool(Logger log, CallableStatement cstmt, e); } } + + if (rs != null) { + try { + rs.close(); + } catch (SQLException e) { + logAndThrowException(log, "Exception while trying to close ResultSet", + e); + } + } + } + + /** + * Returns the SQL FederationStateStore connections to the pool. + * + * @param log the logger interface + * @param cstmt the interface used to execute SQL stored procedures + * @param conn the SQL connection + * @throws YarnException on failure + */ + public static void returnToPool(Logger log, CallableStatement cstmt, + Connection conn) throws YarnException { + returnToPool(log, cstmt, conn, null); } /** @@ -95,28 +122,13 @@ public static void logAndThrowException(Logger log, String errMsg, * FederationStateStore. * * @param log the logger interface - * @param code FederationStateStoreErrorCode of the error * @param errMsg the error message * @throws YarnException on failure */ - public static void logAndThrowStoreException(Logger log, - FederationStateStoreErrorCode code, String errMsg) throws YarnException { - log.error(errMsg + " " + code.toString()); - throw new FederationStateStoreException(code); - } - - /** - * Throws an FederationStateStoreException due to an error in - * FederationStateStore. - * - * @param log the logger interface - * @param code FederationStateStoreErrorCode of the error - * @throws YarnException on failure - */ - public static void logAndThrowStoreException(Logger log, - FederationStateStoreErrorCode code) throws YarnException { - log.error(code.toString()); - throw new FederationStateStoreException(code); + public static void logAndThrowStoreException(Logger log, String errMsg) + throws YarnException { + log.error(errMsg); + throw new FederationStateStoreException(errMsg); } /** @@ -129,7 +141,7 @@ public static void logAndThrowStoreException(Logger log, */ public static void logAndThrowInvalidInputException(Logger log, String errMsg) throws YarnException { - LOG.error(errMsg); + log.error(errMsg); throw new FederationStateStoreInvalidInputException(errMsg); } @@ -145,11 +157,58 @@ public static void logAndThrowInvalidInputException(Logger log, String errMsg) public static void logAndThrowRetriableException(Logger log, String errMsg, Throwable t) throws YarnException { if (t != null) { - LOG.error(errMsg, t); + log.error(errMsg, t); throw new FederationStateStoreRetriableException(errMsg, t); } else { - LOG.error(errMsg); + log.error(errMsg); throw new FederationStateStoreRetriableException(errMsg); } } + + /** + * Sets a specific value for a specific property of + * HikariDataSource SQL connections. + * + * @param dataSource the HikariDataSource connections + * @param property the property to set + * @param value the value to set + */ + public static void setProperty(HikariDataSource dataSource, String property, + String value) { + LOG.debug("Setting property {} with value {}", property, value); + if (property != null && !property.isEmpty() && value != null) { + dataSource.addDataSourceProperty(property, value); + } + } + + /** + * Sets a specific username for HikariDataSource SQL connections. + * + * @param dataSource the HikariDataSource connections + * @param userNameDB the value to set + */ + public static void setUsername(HikariDataSource dataSource, + String userNameDB) { + if (userNameDB != null) { + dataSource.setUsername(userNameDB); + LOG.debug("Setting non NULL Username for Store connection"); + } else { + LOG.debug("NULL Username specified for Store connection, so ignoring"); + } + } + + /** + * Sets a specific password for HikariDataSource SQL connections. + * + * @param dataSource the HikariDataSource connections + * @param password the value to set + */ + public static void setPassword(HikariDataSource dataSource, String password) { + if (password != null) { + dataSource.setPassword(password); + LOG.debug("Setting non NULL Credentials for Store connection"); + } else { + LOG.debug("NULL Credentials specified for Store connection, so ignoring"); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java index 80b00ef79d..db04592e3a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java @@ -19,13 +19,14 @@ import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Calendar; import java.util.List; +import java.util.TimeZone; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; -import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreErrorCode; import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreException; import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest; import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse; @@ -87,13 +88,26 @@ public void after() throws Exception { @Test public void testRegisterSubCluster() throws Exception { SubClusterId subClusterId = SubClusterId.newInstance("SC"); + SubClusterInfo subClusterInfo = createSubClusterInfo(subClusterId); + long previousTimeStamp = + Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis(); + SubClusterRegisterResponse result = stateStore.registerSubCluster( SubClusterRegisterRequest.newInstance(subClusterInfo)); + long currentTimeStamp = + Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis(); + Assert.assertNotNull(result); Assert.assertEquals(subClusterInfo, querySubClusterInfo(subClusterId)); + + // The saved heartbeat is between the old one and the current timestamp + Assert.assertTrue(querySubClusterInfo(subClusterId) + .getLastHeartBeat() <= currentTimeStamp); + Assert.assertTrue(querySubClusterInfo(subClusterId) + .getLastHeartBeat() >= previousTimeStamp); } @Test @@ -120,9 +134,7 @@ public void testDeregisterSubClusterUnknownSubCluster() throws Exception { stateStore.deregisterSubCluster(deregisterRequest); Assert.fail(); } catch (FederationStateStoreException e) { - Assert.assertEquals( - FederationStateStoreErrorCode.MEMBERSHIP_UPDATE_DEREGISTER_FAIL, - e.getCode()); + Assert.assertTrue(e.getMessage().startsWith("SubCluster SC not found")); } } @@ -149,9 +161,8 @@ public void testGetSubClusterInfoUnknownSubCluster() throws Exception { stateStore.getSubCluster(request).getSubClusterInfo(); Assert.fail(); } catch (FederationStateStoreException e) { - Assert.assertEquals( - FederationStateStoreErrorCode.MEMBERSHIP_SINGLE_SELECT_FAIL, - e.getCode()); + Assert.assertTrue( + e.getMessage().startsWith("SubCluster SC does not exist")); } } @@ -200,13 +211,24 @@ public void testSubClusterHeartbeat() throws Exception { SubClusterId subClusterId = SubClusterId.newInstance("SC"); registerSubCluster(createSubClusterInfo(subClusterId)); + long previousHeartBeat = + querySubClusterInfo(subClusterId).getLastHeartBeat(); + SubClusterHeartbeatRequest heartbeatRequest = SubClusterHeartbeatRequest .newInstance(subClusterId, SubClusterState.SC_RUNNING, "capability"); stateStore.subClusterHeartbeat(heartbeatRequest); + long currentTimeStamp = + Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis(); + Assert.assertEquals(SubClusterState.SC_RUNNING, querySubClusterInfo(subClusterId).getState()); - Assert.assertNotNull(querySubClusterInfo(subClusterId).getLastHeartBeat()); + + // The saved heartbeat is between the old one and the current timestamp + Assert.assertTrue(querySubClusterInfo(subClusterId) + .getLastHeartBeat() <= currentTimeStamp); + Assert.assertTrue(querySubClusterInfo(subClusterId) + .getLastHeartBeat() >= previousHeartBeat); } @Test @@ -219,9 +241,8 @@ public void testSubClusterHeartbeatUnknownSubCluster() throws Exception { stateStore.subClusterHeartbeat(heartbeatRequest); Assert.fail(); } catch (FederationStateStoreException e) { - Assert.assertEquals( - FederationStateStoreErrorCode.MEMBERSHIP_UPDATE_HEARTBEAT_FAIL, - e.getCode()); + Assert.assertTrue(e.getMessage() + .startsWith("SubCluster SC does not exist; cannot heartbeat")); } } @@ -281,9 +302,8 @@ public void testDeleteApplicationHomeSubCluster() throws Exception { queryApplicationHomeSC(appId); Assert.fail(); } catch (FederationStateStoreException e) { - Assert.assertEquals( - FederationStateStoreErrorCode.APPLICATIONS_SINGLE_SELECT_FAIL, - e.getCode()); + Assert.assertTrue(e.getMessage() + .startsWith("Application " + appId + " does not exist")); } } @@ -298,8 +318,8 @@ public void testDeleteApplicationHomeSubClusterUnknownApp() throws Exception { stateStore.deleteApplicationHomeSubCluster(delRequest); Assert.fail(); } catch (FederationStateStoreException e) { - Assert.assertEquals( - FederationStateStoreErrorCode.APPLICATIONS_DELETE_FAIL, e.getCode()); + Assert.assertTrue(e.getMessage() + .startsWith("Application " + appId.toString() + " does not exist")); } } @@ -331,9 +351,8 @@ public void testGetApplicationHomeSubClusterUnknownApp() throws Exception { stateStore.getApplicationHomeSubCluster(request); Assert.fail(); } catch (FederationStateStoreException e) { - Assert.assertEquals( - FederationStateStoreErrorCode.APPLICATIONS_SINGLE_SELECT_FAIL, - e.getCode()); + Assert.assertTrue(e.getMessage() + .startsWith("Application " + appId.toString() + " does not exist")); } } @@ -397,8 +416,8 @@ public void testUpdateApplicationHomeSubClusterUnknownApp() throws Exception { stateStore.updateApplicationHomeSubCluster((updateRequest)); Assert.fail(); } catch (FederationStateStoreException e) { - Assert.assertEquals( - FederationStateStoreErrorCode.APPLICATIONS_UPDATE_FAIL, e.getCode()); + Assert.assertTrue(e.getMessage() + .startsWith("Application " + appId.toString() + " does not exist")); } } @@ -458,8 +477,8 @@ public void testGetPolicyConfigurationUnknownQueue() throws Exception { stateStore.getPolicyConfiguration(request); Assert.fail(); } catch (FederationStateStoreException e) { - Assert.assertEquals( - FederationStateStoreErrorCode.POLICY_SINGLE_SELECT_FAIL, e.getCode()); + Assert.assertTrue( + e.getMessage().startsWith("Policy for queue Queue does not exist")); } } @@ -499,8 +518,9 @@ private SubClusterInfo createSubClusterInfo(SubClusterId subClusterId) { private SubClusterPolicyConfiguration createSCPolicyConf(String queueName, String policyType) { - return SubClusterPolicyConfiguration.newInstance(queueName, policyType, - ByteBuffer.allocate(1)); + ByteBuffer bb = ByteBuffer.allocate(100); + bb.put((byte) 0x02); + return SubClusterPolicyConfiguration.newInstance(queueName, policyType, bb); } private void addApplicationHomeSC(ApplicationId appId, @@ -558,4 +578,8 @@ protected void setConf(Configuration conf) { this.conf = conf; } + protected Configuration getConf() { + return conf; + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/HSQLDBFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/HSQLDBFederationStateStore.java new file mode 100644 index 0000000000..289a3a6112 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/HSQLDBFederationStateStore.java @@ -0,0 +1,252 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.federation.store.impl; + +import java.sql.Connection; +import java.sql.SQLException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * HSQLDB implementation of {@link FederationStateStore}. + */ +public class HSQLDBFederationStateStore extends SQLFederationStateStore { + + private static final Logger LOG = + LoggerFactory.getLogger(HSQLDBFederationStateStore.class); + + private Connection conn; + + private static final String TABLE_APPLICATIONSHOMESUBCLUSTER = + " CREATE TABLE applicationsHomeSubCluster (" + + " applicationId varchar(64) NOT NULL," + + " homeSubCluster varchar(256) NOT NULL," + + " CONSTRAINT pk_applicationId PRIMARY KEY (applicationId))"; + + private static final String TABLE_MEMBERSHIP = + "CREATE TABLE membership ( subClusterId varchar(256) NOT NULL," + + " amRMServiceAddress varchar(256) NOT NULL," + + " clientRMServiceAddress varchar(256) NOT NULL," + + " rmAdminServiceAddress varchar(256) NOT NULL," + + " rmWebServiceAddress varchar(256) NOT NULL," + + " lastHeartBeat datetime NOT NULL, state varchar(32) NOT NULL," + + " lastStartTime bigint NULL, capability varchar(6000) NOT NULL," + + " CONSTRAINT pk_subClusterId PRIMARY KEY (subClusterId))"; + + private static final String TABLE_POLICIES = + "CREATE TABLE policies ( queue varchar(256) NOT NULL," + + " policyType varchar(256) NOT NULL, params varbinary(512)," + + " CONSTRAINT pk_queue PRIMARY KEY (queue))"; + + private static final String SP_REGISTERSUBCLUSTER = + "CREATE PROCEDURE sp_registerSubCluster(" + + " IN subClusterId_IN varchar(256)," + + " IN amRMServiceAddress_IN varchar(256)," + + " IN clientRMServiceAddress_IN varchar(256)," + + " IN rmAdminServiceAddress_IN varchar(256)," + + " IN rmWebServiceAddress_IN varchar(256)," + + " IN state_IN varchar(256)," + + " IN lastStartTime_IN bigint, IN capability_IN varchar(6000)," + + " OUT rowCount_OUT int)MODIFIES SQL DATA BEGIN ATOMIC" + + " DELETE FROM membership WHERE (subClusterId = subClusterId_IN);" + + " INSERT INTO membership ( subClusterId," + + " amRMServiceAddress, clientRMServiceAddress," + + " rmAdminServiceAddress, rmWebServiceAddress," + + " lastHeartBeat, state, lastStartTime," + + " capability) VALUES ( subClusterId_IN," + + " amRMServiceAddress_IN, clientRMServiceAddress_IN," + + " rmAdminServiceAddress_IN, rmWebServiceAddress_IN," + + " NOW() AT TIME ZONE INTERVAL '0:00' HOUR TO MINUTE," + + " state_IN, lastStartTime_IN, capability_IN);" + + " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT; END"; + + private static final String SP_DEREGISTERSUBCLUSTER = + "CREATE PROCEDURE sp_deregisterSubCluster(" + + " IN subClusterId_IN varchar(256)," + + " IN state_IN varchar(64), OUT rowCount_OUT int)" + + " MODIFIES SQL DATA BEGIN ATOMIC" + + " UPDATE membership SET state = state_IN WHERE (" + + " subClusterId = subClusterId_IN AND state != state_IN);" + + " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT; END"; + + private static final String SP_SUBCLUSTERHEARTBEAT = + "CREATE PROCEDURE sp_subClusterHeartbeat(" + + " IN subClusterId_IN varchar(256), IN state_IN varchar(64)," + + " IN capability_IN varchar(6000), OUT rowCount_OUT int)" + + " MODIFIES SQL DATA BEGIN ATOMIC UPDATE membership" + + " SET capability = capability_IN, state = state_IN," + + " lastHeartBeat = NOW() AT TIME ZONE INTERVAL '0:00'" + + " HOUR TO MINUTE WHERE subClusterId = subClusterId_IN;" + + " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT; END"; + + private static final String SP_GETSUBCLUSTER = + "CREATE PROCEDURE sp_getSubCluster( IN subClusterId_IN varchar(256)," + + " OUT amRMServiceAddress_OUT varchar(256)," + + " OUT clientRMServiceAddress_OUT varchar(256)," + + " OUT rmAdminServiceAddress_OUT varchar(256)," + + " OUT rmWebServiceAddress_OUT varchar(256)," + + " OUT lastHeartBeat_OUT datetime, OUT state_OUT varchar(64)," + + " OUT lastStartTime_OUT bigint," + + " OUT capability_OUT varchar(6000))" + + " MODIFIES SQL DATA BEGIN ATOMIC SELECT amRMServiceAddress," + + " clientRMServiceAddress," + + " rmAdminServiceAddress, rmWebServiceAddress," + + " lastHeartBeat, state, lastStartTime, capability" + + " INTO amRMServiceAddress_OUT, clientRMServiceAddress_OUT," + + " rmAdminServiceAddress_OUT," + + " rmWebServiceAddress_OUT, lastHeartBeat_OUT," + + " state_OUT, lastStartTime_OUT, capability_OUT" + + " FROM membership WHERE subClusterId = subClusterId_IN; END"; + + private static final String SP_GETSUBCLUSTERS = + "CREATE PROCEDURE sp_getSubClusters()" + + " MODIFIES SQL DATA DYNAMIC RESULT SETS 1 BEGIN ATOMIC" + + " DECLARE result CURSOR FOR" + + " SELECT subClusterId, amRMServiceAddress, clientRMServiceAddress," + + " rmAdminServiceAddress, rmWebServiceAddress, lastHeartBeat," + + " state, lastStartTime, capability" + + " FROM membership; OPEN result; END"; + + private static final String SP_ADDAPPLICATIONHOMESUBCLUSTER = + "CREATE PROCEDURE sp_addApplicationHomeSubCluster(" + + " IN applicationId_IN varchar(64)," + + " IN homeSubCluster_IN varchar(256)," + + " OUT storedHomeSubCluster_OUT varchar(256), OUT rowCount_OUT int)" + + " MODIFIES SQL DATA BEGIN ATOMIC" + + " INSERT INTO applicationsHomeSubCluster " + + " (applicationId,homeSubCluster) " + + " (SELECT applicationId_IN, homeSubCluster_IN" + + " FROM applicationsHomeSubCluster" + + " WHERE applicationId = applicationId_IN" + + " HAVING COUNT(*) = 0 );" + + " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT;" + + " SELECT homeSubCluster INTO storedHomeSubCluster_OUT" + + " FROM applicationsHomeSubCluster" + + " WHERE applicationId = applicationID_IN; END"; + + private static final String SP_UPDATEAPPLICATIONHOMESUBCLUSTER = + "CREATE PROCEDURE sp_updateApplicationHomeSubCluster(" + + " IN applicationId_IN varchar(64)," + + " IN homeSubCluster_IN varchar(256), OUT rowCount_OUT int)" + + " MODIFIES SQL DATA BEGIN ATOMIC" + + " UPDATE applicationsHomeSubCluster" + + " SET homeSubCluster = homeSubCluster_IN" + + " WHERE applicationId = applicationId_IN;" + + " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT; END"; + + private static final String SP_GETAPPLICATIONHOMESUBCLUSTER = + "CREATE PROCEDURE sp_getApplicationHomeSubCluster(" + + " IN applicationId_IN varchar(64)," + + " OUT homeSubCluster_OUT varchar(256))" + + " MODIFIES SQL DATA BEGIN ATOMIC" + + " SELECT homeSubCluster INTO homeSubCluster_OUT" + + " FROM applicationsHomeSubCluster" + + " WHERE applicationId = applicationID_IN; END"; + + private static final String SP_GETAPPLICATIONSHOMESUBCLUSTER = + "CREATE PROCEDURE sp_getApplicationsHomeSubCluster()" + + " MODIFIES SQL DATA DYNAMIC RESULT SETS 1 BEGIN ATOMIC" + + " DECLARE result CURSOR FOR" + + " SELECT applicationId, homeSubCluster" + + " FROM applicationsHomeSubCluster; OPEN result; END"; + + private static final String SP_DELETEAPPLICATIONHOMESUBCLUSTER = + "CREATE PROCEDURE sp_deleteApplicationHomeSubCluster(" + + " IN applicationId_IN varchar(64), OUT rowCount_OUT int)" + + " MODIFIES SQL DATA BEGIN ATOMIC" + + " DELETE FROM applicationsHomeSubCluster" + + " WHERE applicationId = applicationId_IN;" + + " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT; END"; + + private static final String SP_SETPOLICYCONFIGURATION = + "CREATE PROCEDURE sp_setPolicyConfiguration(" + + " IN queue_IN varchar(256), IN policyType_IN varchar(256)," + + " IN params_IN varbinary(512), OUT rowCount_OUT int)" + + " MODIFIES SQL DATA BEGIN ATOMIC" + + " DELETE FROM policies WHERE queue = queue_IN;" + + " INSERT INTO policies (queue, policyType, params)" + + " VALUES (queue_IN, policyType_IN, params_IN);" + + " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT; END"; + + private static final String SP_GETPOLICYCONFIGURATION = + "CREATE PROCEDURE sp_getPolicyConfiguration(" + + " IN queue_IN varchar(256), OUT policyType_OUT varchar(256)," + + " OUT params_OUT varbinary(512)) MODIFIES SQL DATA BEGIN ATOMIC" + + " SELECT policyType, params INTO policyType_OUT, params_OUT" + + " FROM policies WHERE queue = queue_IN; END"; + + private static final String SP_GETPOLICIESCONFIGURATIONS = + "CREATE PROCEDURE sp_getPoliciesConfigurations()" + + " MODIFIES SQL DATA DYNAMIC RESULT SETS 1 BEGIN ATOMIC" + + " DECLARE result CURSOR FOR" + + " SELECT * FROM policies; OPEN result; END"; + + @Override + public void init(Configuration conf) { + try { + super.init(conf); + } catch (YarnException e1) { + LOG.error("ERROR: failed to init HSQLDB " + e1.getMessage()); + } + try { + conn = getConnection(); + + LOG.info("Database Init: Start"); + + conn.prepareStatement(TABLE_APPLICATIONSHOMESUBCLUSTER).execute(); + conn.prepareStatement(TABLE_MEMBERSHIP).execute(); + conn.prepareStatement(TABLE_POLICIES).execute(); + + conn.prepareStatement(SP_REGISTERSUBCLUSTER).execute(); + conn.prepareStatement(SP_DEREGISTERSUBCLUSTER).execute(); + conn.prepareStatement(SP_SUBCLUSTERHEARTBEAT).execute(); + conn.prepareStatement(SP_GETSUBCLUSTER).execute(); + conn.prepareStatement(SP_GETSUBCLUSTERS).execute(); + + conn.prepareStatement(SP_ADDAPPLICATIONHOMESUBCLUSTER).execute(); + conn.prepareStatement(SP_UPDATEAPPLICATIONHOMESUBCLUSTER).execute(); + conn.prepareStatement(SP_GETAPPLICATIONHOMESUBCLUSTER).execute(); + conn.prepareStatement(SP_GETAPPLICATIONSHOMESUBCLUSTER).execute(); + conn.prepareStatement(SP_DELETEAPPLICATIONHOMESUBCLUSTER).execute(); + + conn.prepareStatement(SP_SETPOLICYCONFIGURATION).execute(); + conn.prepareStatement(SP_GETPOLICYCONFIGURATION).execute(); + conn.prepareStatement(SP_GETPOLICIESCONFIGURATIONS).execute(); + + LOG.info("Database Init: Complete"); + conn.close(); + } catch (SQLException e) { + LOG.error("ERROR: failed to inizialize HSQLDB " + e.getMessage()); + } + } + + public void closeConnection() { + try { + conn.close(); + } catch (SQLException e) { + LOG.error( + "ERROR: failed to close connection to HSQLDB DB " + e.getMessage()); + } + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java index 64adab8394..c29fc03c5b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java @@ -28,7 +28,8 @@ public class TestMemoryFederationStateStore @Override protected FederationStateStore createStateStore() { - super.setConf(new Configuration()); + Configuration conf = new Configuration(); + super.setConf(conf); return new MemoryFederationStateStore(); } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestSQLFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestSQLFederationStateStore.java new file mode 100644 index 0000000000..d4e6cc53f6 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestSQLFederationStateStore.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.federation.store.impl; + +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; + +/** + * Unit tests for SQLFederationStateStore. + */ +public class TestSQLFederationStateStore extends FederationStateStoreBaseTest { + + private static final String HSQLDB_DRIVER = "org.hsqldb.jdbc.JDBCDataSource"; + private static final String DATABASE_URL = "jdbc:hsqldb:mem:state"; + private static final String DATABASE_USERNAME = "SA"; + private static final String DATABASE_PASSWORD = ""; + + @Override + protected FederationStateStore createStateStore() { + + YarnConfiguration conf = new YarnConfiguration(); + + conf.set(YarnConfiguration.FEDERATION_STATESTORE_SQL_JDBC_CLASS, + HSQLDB_DRIVER); + conf.set(YarnConfiguration.FEDERATION_STATESTORE_SQL_USERNAME, + DATABASE_USERNAME); + conf.set(YarnConfiguration.FEDERATION_STATESTORE_SQL_PASSWORD, + DATABASE_PASSWORD); + conf.set(YarnConfiguration.FEDERATION_STATESTORE_SQL_URL, + DATABASE_URL + System.currentTimeMillis()); + super.setConf(conf); + return new HSQLDBFederationStateStore(); + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/utils/TestFederationStateStoreInputValidator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/utils/TestFederationStateStoreInputValidator.java index 8ac5e81e27..5a5703e6f6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/utils/TestFederationStateStoreInputValidator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/utils/TestFederationStateStoreInputValidator.java @@ -145,7 +145,7 @@ public void testValidateSubClusterRegisterRequest() { SubClusterRegisterRequest request = SubClusterRegisterRequest.newInstance(subClusterInfo); FederationMembershipStateStoreInputValidator - .validateSubClusterRegisterRequest(request); + .validate(request); } catch (FederationStateStoreInvalidInputException e) { Assert.fail(e.getMessage()); } @@ -155,7 +155,7 @@ public void testValidateSubClusterRegisterRequest() { try { SubClusterRegisterRequest request = null; FederationMembershipStateStoreInputValidator - .validateSubClusterRegisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -170,7 +170,7 @@ public void testValidateSubClusterRegisterRequest() { SubClusterRegisterRequest request = SubClusterRegisterRequest.newInstance(subClusterInfo); FederationMembershipStateStoreInputValidator - .validateSubClusterRegisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -188,7 +188,7 @@ public void testValidateSubClusterRegisterRequest() { SubClusterRegisterRequest request = SubClusterRegisterRequest.newInstance(subClusterInfo); FederationMembershipStateStoreInputValidator - .validateSubClusterRegisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -206,7 +206,7 @@ public void testValidateSubClusterRegisterRequest() { SubClusterRegisterRequest request = SubClusterRegisterRequest.newInstance(subClusterInfo); FederationMembershipStateStoreInputValidator - .validateSubClusterRegisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -224,7 +224,7 @@ public void testValidateSubClusterRegisterRequest() { SubClusterRegisterRequest request = SubClusterRegisterRequest.newInstance(subClusterInfo); FederationMembershipStateStoreInputValidator - .validateSubClusterRegisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -242,7 +242,7 @@ public void testValidateSubClusterRegisterRequest() { SubClusterRegisterRequest request = SubClusterRegisterRequest.newInstance(subClusterInfo); FederationMembershipStateStoreInputValidator - .validateSubClusterRegisterRequest(request); + .validate(request); } catch (FederationStateStoreInvalidInputException e) { Assert.fail(e.getMessage()); } @@ -257,7 +257,7 @@ public void testValidateSubClusterRegisterRequest() { SubClusterRegisterRequest request = SubClusterRegisterRequest.newInstance(subClusterInfo); FederationMembershipStateStoreInputValidator - .validateSubClusterRegisterRequest(request); + .validate(request); } catch (FederationStateStoreInvalidInputException e) { Assert.fail(e.getMessage()); } @@ -276,7 +276,7 @@ public void testValidateSubClusterRegisterRequestTimestamp() { SubClusterRegisterRequest request = SubClusterRegisterRequest.newInstance(subClusterInfo); FederationMembershipStateStoreInputValidator - .validateSubClusterRegisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -294,7 +294,7 @@ public void testValidateSubClusterRegisterRequestTimestamp() { SubClusterRegisterRequest request = SubClusterRegisterRequest.newInstance(subClusterInfo); FederationMembershipStateStoreInputValidator - .validateSubClusterRegisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -315,7 +315,7 @@ public void testValidateSubClusterRegisterRequestAddress() { SubClusterRegisterRequest request = SubClusterRegisterRequest.newInstance(subClusterInfo); FederationMembershipStateStoreInputValidator - .validateSubClusterRegisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -332,7 +332,7 @@ public void testValidateSubClusterRegisterRequestAddress() { SubClusterRegisterRequest request = SubClusterRegisterRequest.newInstance(subClusterInfo); FederationMembershipStateStoreInputValidator - .validateSubClusterRegisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -350,7 +350,7 @@ public void testValidateSubClusterRegisterRequestAddress() { SubClusterRegisterRequest request = SubClusterRegisterRequest.newInstance(subClusterInfo); FederationMembershipStateStoreInputValidator - .validateSubClusterRegisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -368,7 +368,7 @@ public void testValidateSubClusterRegisterRequestAddress() { SubClusterRegisterRequest request = SubClusterRegisterRequest.newInstance(subClusterInfo); FederationMembershipStateStoreInputValidator - .validateSubClusterRegisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -386,7 +386,7 @@ public void testValidateSubClusterRegisterRequestAddress() { SubClusterRegisterRequest request = SubClusterRegisterRequest.newInstance(subClusterInfo); FederationMembershipStateStoreInputValidator - .validateSubClusterRegisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -404,7 +404,7 @@ public void testValidateSubClusterRegisterRequestAddress() { SubClusterRegisterRequest request = SubClusterRegisterRequest.newInstance(subClusterInfo); FederationMembershipStateStoreInputValidator - .validateSubClusterRegisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -421,7 +421,7 @@ public void testValidateSubClusterRegisterRequestAddress() { SubClusterRegisterRequest request = SubClusterRegisterRequest.newInstance(subClusterInfo); FederationMembershipStateStoreInputValidator - .validateSubClusterRegisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -438,7 +438,7 @@ public void testValidateSubClusterRegisterRequestAddress() { SubClusterRegisterRequest request = SubClusterRegisterRequest.newInstance(subClusterInfo); FederationMembershipStateStoreInputValidator - .validateSubClusterRegisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -460,7 +460,7 @@ public void testValidateSubClusterRegisterRequestAddressInvalid() { SubClusterRegisterRequest request = SubClusterRegisterRequest.newInstance(subClusterInfo); FederationMembershipStateStoreInputValidator - .validateSubClusterRegisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -477,7 +477,7 @@ public void testValidateSubClusterRegisterRequestAddressInvalid() { SubClusterRegisterRequest request = SubClusterRegisterRequest.newInstance(subClusterInfo); FederationMembershipStateStoreInputValidator - .validateSubClusterRegisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -494,7 +494,7 @@ public void testValidateSubClusterRegisterRequestAddressInvalid() { SubClusterRegisterRequest request = SubClusterRegisterRequest.newInstance(subClusterInfo); FederationMembershipStateStoreInputValidator - .validateSubClusterRegisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -510,7 +510,7 @@ public void testValidateSubClusterRegisterRequestAddressInvalid() { SubClusterRegisterRequest request = SubClusterRegisterRequest.newInstance(subClusterInfo); FederationMembershipStateStoreInputValidator - .validateSubClusterRegisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -526,7 +526,7 @@ public void testValidateSubClusterRegisterRequestAddressInvalid() { SubClusterRegisterRequest request = SubClusterRegisterRequest.newInstance(subClusterInfo); FederationMembershipStateStoreInputValidator - .validateSubClusterRegisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -543,7 +543,7 @@ public void testValidateSubClusterRegisterRequestAddressInvalid() { SubClusterRegisterRequest request = SubClusterRegisterRequest.newInstance(subClusterInfo); FederationMembershipStateStoreInputValidator - .validateSubClusterRegisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -560,7 +560,7 @@ public void testValidateSubClusterRegisterRequestAddressInvalid() { SubClusterRegisterRequest request = SubClusterRegisterRequest.newInstance(subClusterInfo); FederationMembershipStateStoreInputValidator - .validateSubClusterRegisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -576,7 +576,7 @@ public void testValidateSubClusterRegisterRequestAddressInvalid() { SubClusterRegisterRequest request = SubClusterRegisterRequest.newInstance(subClusterInfo); FederationMembershipStateStoreInputValidator - .validateSubClusterRegisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -594,7 +594,7 @@ public void testValidateSubClusterDeregisterRequest() { SubClusterDeregisterRequest request = SubClusterDeregisterRequest.newInstance(subClusterId, stateLost); FederationMembershipStateStoreInputValidator - .validateSubClusterDeregisterRequest(request); + .validate(request); } catch (FederationStateStoreInvalidInputException e) { Assert.fail(e.getMessage()); } @@ -604,7 +604,7 @@ public void testValidateSubClusterDeregisterRequest() { try { SubClusterDeregisterRequest request = null; FederationMembershipStateStoreInputValidator - .validateSubClusterDeregisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -618,7 +618,7 @@ public void testValidateSubClusterDeregisterRequest() { SubClusterDeregisterRequest request = SubClusterDeregisterRequest.newInstance(subClusterIdNull, stateLost); FederationMembershipStateStoreInputValidator - .validateSubClusterDeregisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -632,7 +632,7 @@ public void testValidateSubClusterDeregisterRequest() { SubClusterDeregisterRequest request = SubClusterDeregisterRequest .newInstance(subClusterIdInvalid, stateLost); FederationMembershipStateStoreInputValidator - .validateSubClusterDeregisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -646,7 +646,7 @@ public void testValidateSubClusterDeregisterRequest() { SubClusterDeregisterRequest request = SubClusterDeregisterRequest.newInstance(subClusterId, stateNull); FederationMembershipStateStoreInputValidator - .validateSubClusterDeregisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -660,7 +660,7 @@ public void testValidateSubClusterDeregisterRequest() { SubClusterDeregisterRequest request = SubClusterDeregisterRequest.newInstance(subClusterId, stateNew); FederationMembershipStateStoreInputValidator - .validateSubClusterDeregisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -677,7 +677,7 @@ public void testSubClusterHeartbeatRequest() { SubClusterHeartbeatRequest request = SubClusterHeartbeatRequest .newInstance(subClusterId, lastHeartBeat, stateLost, capability); FederationMembershipStateStoreInputValidator - .validateSubClusterHeartbeatRequest(request); + .validate(request); } catch (FederationStateStoreInvalidInputException e) { Assert.fail(e.getMessage()); } @@ -687,7 +687,7 @@ public void testSubClusterHeartbeatRequest() { try { SubClusterHeartbeatRequest request = null; FederationMembershipStateStoreInputValidator - .validateSubClusterHeartbeatRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -701,7 +701,7 @@ public void testSubClusterHeartbeatRequest() { SubClusterHeartbeatRequest request = SubClusterHeartbeatRequest .newInstance(subClusterIdNull, lastHeartBeat, stateLost, capability); FederationMembershipStateStoreInputValidator - .validateSubClusterHeartbeatRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -716,7 +716,7 @@ public void testSubClusterHeartbeatRequest() { SubClusterHeartbeatRequest.newInstance(subClusterIdInvalid, lastHeartBeat, stateLost, capability); FederationMembershipStateStoreInputValidator - .validateSubClusterHeartbeatRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -730,7 +730,7 @@ public void testSubClusterHeartbeatRequest() { SubClusterHeartbeatRequest request = SubClusterHeartbeatRequest .newInstance(subClusterId, lastHeartBeat, stateNull, capability); FederationMembershipStateStoreInputValidator - .validateSubClusterHeartbeatRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -745,7 +745,7 @@ public void testSubClusterHeartbeatRequest() { SubClusterHeartbeatRequest.newInstance(subClusterId, lastHeartBeatNegative, stateLost, capability); FederationMembershipStateStoreInputValidator - .validateSubClusterHeartbeatRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -759,7 +759,7 @@ public void testSubClusterHeartbeatRequest() { SubClusterHeartbeatRequest request = SubClusterHeartbeatRequest .newInstance(subClusterId, lastHeartBeat, stateLost, capabilityNull); FederationMembershipStateStoreInputValidator - .validateSubClusterHeartbeatRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -773,7 +773,7 @@ public void testSubClusterHeartbeatRequest() { SubClusterHeartbeatRequest request = SubClusterHeartbeatRequest .newInstance(subClusterId, lastHeartBeat, stateLost, capabilityEmpty); FederationMembershipStateStoreInputValidator - .validateSubClusterHeartbeatRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -791,7 +791,7 @@ public void testGetSubClusterInfoRequest() { GetSubClusterInfoRequest request = GetSubClusterInfoRequest.newInstance(subClusterId); FederationMembershipStateStoreInputValidator - .validateGetSubClusterInfoRequest(request); + .validate(request); } catch (FederationStateStoreInvalidInputException e) { Assert.fail(e.getMessage()); } @@ -801,7 +801,7 @@ public void testGetSubClusterInfoRequest() { try { GetSubClusterInfoRequest request = null; FederationMembershipStateStoreInputValidator - .validateGetSubClusterInfoRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -815,7 +815,7 @@ public void testGetSubClusterInfoRequest() { GetSubClusterInfoRequest request = GetSubClusterInfoRequest.newInstance(subClusterIdNull); FederationMembershipStateStoreInputValidator - .validateGetSubClusterInfoRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -829,7 +829,7 @@ public void testGetSubClusterInfoRequest() { GetSubClusterInfoRequest request = GetSubClusterInfoRequest.newInstance(subClusterIdInvalid); FederationMembershipStateStoreInputValidator - .validateGetSubClusterInfoRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -850,7 +850,7 @@ public void testAddApplicationHomeSubClusterRequest() { AddApplicationHomeSubClusterRequest .newInstance(applicationHomeSubCluster); FederationApplicationHomeSubClusterStoreInputValidator - .validateAddApplicationHomeSubClusterRequest(request); + .validate(request); } catch (FederationStateStoreInvalidInputException e) { Assert.fail(e.getMessage()); } @@ -860,7 +860,7 @@ public void testAddApplicationHomeSubClusterRequest() { try { AddApplicationHomeSubClusterRequest request = null; FederationApplicationHomeSubClusterStoreInputValidator - .validateAddApplicationHomeSubClusterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { Assert.assertTrue(e.getMessage() @@ -875,7 +875,7 @@ public void testAddApplicationHomeSubClusterRequest() { AddApplicationHomeSubClusterRequest .newInstance(applicationHomeSubCluster); FederationApplicationHomeSubClusterStoreInputValidator - .validateAddApplicationHomeSubClusterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { Assert.assertTrue( @@ -891,7 +891,7 @@ public void testAddApplicationHomeSubClusterRequest() { AddApplicationHomeSubClusterRequest .newInstance(applicationHomeSubCluster); FederationApplicationHomeSubClusterStoreInputValidator - .validateAddApplicationHomeSubClusterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -908,7 +908,7 @@ public void testAddApplicationHomeSubClusterRequest() { AddApplicationHomeSubClusterRequest .newInstance(applicationHomeSubCluster); FederationApplicationHomeSubClusterStoreInputValidator - .validateAddApplicationHomeSubClusterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -925,7 +925,7 @@ public void testAddApplicationHomeSubClusterRequest() { AddApplicationHomeSubClusterRequest .newInstance(applicationHomeSubCluster); FederationApplicationHomeSubClusterStoreInputValidator - .validateAddApplicationHomeSubClusterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { Assert.assertTrue(e.getMessage().startsWith("Missing Application Id.")); @@ -944,7 +944,7 @@ public void testUpdateApplicationHomeSubClusterRequest() { UpdateApplicationHomeSubClusterRequest .newInstance(applicationHomeSubCluster); FederationApplicationHomeSubClusterStoreInputValidator - .validateUpdateApplicationHomeSubClusterRequest(request); + .validate(request); } catch (FederationStateStoreInvalidInputException e) { Assert.fail(e.getMessage()); } @@ -954,7 +954,7 @@ public void testUpdateApplicationHomeSubClusterRequest() { try { UpdateApplicationHomeSubClusterRequest request = null; FederationApplicationHomeSubClusterStoreInputValidator - .validateUpdateApplicationHomeSubClusterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { Assert.assertTrue(e.getMessage() @@ -969,7 +969,7 @@ public void testUpdateApplicationHomeSubClusterRequest() { UpdateApplicationHomeSubClusterRequest .newInstance(applicationHomeSubCluster); FederationApplicationHomeSubClusterStoreInputValidator - .validateUpdateApplicationHomeSubClusterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { Assert.assertTrue( @@ -985,7 +985,7 @@ public void testUpdateApplicationHomeSubClusterRequest() { UpdateApplicationHomeSubClusterRequest .newInstance(applicationHomeSubCluster); FederationApplicationHomeSubClusterStoreInputValidator - .validateUpdateApplicationHomeSubClusterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -1002,7 +1002,7 @@ public void testUpdateApplicationHomeSubClusterRequest() { UpdateApplicationHomeSubClusterRequest .newInstance(applicationHomeSubCluster); FederationApplicationHomeSubClusterStoreInputValidator - .validateUpdateApplicationHomeSubClusterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -1019,7 +1019,7 @@ public void testUpdateApplicationHomeSubClusterRequest() { UpdateApplicationHomeSubClusterRequest .newInstance(applicationHomeSubCluster); FederationApplicationHomeSubClusterStoreInputValidator - .validateUpdateApplicationHomeSubClusterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { Assert.assertTrue(e.getMessage().startsWith("Missing Application Id.")); @@ -1035,7 +1035,7 @@ public void testGetApplicationHomeSubClusterRequest() { GetApplicationHomeSubClusterRequest request = GetApplicationHomeSubClusterRequest.newInstance(appId); FederationApplicationHomeSubClusterStoreInputValidator - .validateGetApplicationHomeSubClusterRequest(request); + .validate(request); } catch (FederationStateStoreInvalidInputException e) { Assert.fail(e.getMessage()); } @@ -1045,7 +1045,7 @@ public void testGetApplicationHomeSubClusterRequest() { try { GetApplicationHomeSubClusterRequest request = null; FederationApplicationHomeSubClusterStoreInputValidator - .validateGetApplicationHomeSubClusterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { Assert.assertTrue(e.getMessage() @@ -1058,7 +1058,7 @@ public void testGetApplicationHomeSubClusterRequest() { GetApplicationHomeSubClusterRequest request = GetApplicationHomeSubClusterRequest.newInstance(appIdNull); FederationApplicationHomeSubClusterStoreInputValidator - .validateGetApplicationHomeSubClusterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { Assert.assertTrue(e.getMessage().startsWith("Missing Application Id.")); @@ -1075,7 +1075,7 @@ public void testDeleteApplicationHomeSubClusterRequestNull() { DeleteApplicationHomeSubClusterRequest request = DeleteApplicationHomeSubClusterRequest.newInstance(appId); FederationApplicationHomeSubClusterStoreInputValidator - .validateDeleteApplicationHomeSubClusterRequest(request); + .validate(request); } catch (FederationStateStoreInvalidInputException e) { Assert.fail(e.getMessage()); } @@ -1085,7 +1085,7 @@ public void testDeleteApplicationHomeSubClusterRequestNull() { try { DeleteApplicationHomeSubClusterRequest request = null; FederationApplicationHomeSubClusterStoreInputValidator - .validateDeleteApplicationHomeSubClusterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { Assert.assertTrue(e.getMessage() @@ -1098,7 +1098,7 @@ public void testDeleteApplicationHomeSubClusterRequestNull() { DeleteApplicationHomeSubClusterRequest request = DeleteApplicationHomeSubClusterRequest.newInstance(appIdNull); FederationApplicationHomeSubClusterStoreInputValidator - .validateDeleteApplicationHomeSubClusterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { Assert.assertTrue(e.getMessage().startsWith("Missing Application Id.")); @@ -1115,7 +1115,7 @@ public void testGetSubClusterPolicyConfigurationRequest() { GetSubClusterPolicyConfigurationRequest request = GetSubClusterPolicyConfigurationRequest.newInstance(queue); FederationPolicyStoreInputValidator - .validateGetSubClusterPolicyConfigurationRequest(request); + .validate(request); } catch (FederationStateStoreInvalidInputException e) { Assert.fail(e.getMessage()); } @@ -1125,7 +1125,7 @@ public void testGetSubClusterPolicyConfigurationRequest() { try { GetSubClusterPolicyConfigurationRequest request = null; FederationPolicyStoreInputValidator - .validateGetSubClusterPolicyConfigurationRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { Assert.assertTrue(e.getMessage() @@ -1138,7 +1138,7 @@ public void testGetSubClusterPolicyConfigurationRequest() { GetSubClusterPolicyConfigurationRequest request = GetSubClusterPolicyConfigurationRequest.newInstance(queueNull); FederationPolicyStoreInputValidator - .validateGetSubClusterPolicyConfigurationRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { Assert.assertTrue(e.getMessage().startsWith("Missing Queue.")); @@ -1150,7 +1150,7 @@ public void testGetSubClusterPolicyConfigurationRequest() { GetSubClusterPolicyConfigurationRequest request = GetSubClusterPolicyConfigurationRequest.newInstance(queueEmpty); FederationPolicyStoreInputValidator - .validateGetSubClusterPolicyConfigurationRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { Assert.assertTrue(e.getMessage().startsWith("Missing Queue.")); @@ -1169,7 +1169,7 @@ public void testSetSubClusterPolicyConfigurationRequest() { SetSubClusterPolicyConfigurationRequest request = SetSubClusterPolicyConfigurationRequest.newInstance(policy); FederationPolicyStoreInputValidator - .validateSetSubClusterPolicyConfigurationRequest(request); + .validate(request); } catch (FederationStateStoreInvalidInputException e) { Assert.fail(e.getMessage()); } @@ -1179,7 +1179,7 @@ public void testSetSubClusterPolicyConfigurationRequest() { try { SetSubClusterPolicyConfigurationRequest request = null; FederationPolicyStoreInputValidator - .validateSetSubClusterPolicyConfigurationRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { Assert.assertTrue(e.getMessage() @@ -1193,7 +1193,7 @@ public void testSetSubClusterPolicyConfigurationRequest() { SetSubClusterPolicyConfigurationRequest request = SetSubClusterPolicyConfigurationRequest.newInstance(policy); FederationPolicyStoreInputValidator - .validateSetSubClusterPolicyConfigurationRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { Assert.assertTrue( @@ -1208,7 +1208,7 @@ public void testSetSubClusterPolicyConfigurationRequest() { SetSubClusterPolicyConfigurationRequest request = SetSubClusterPolicyConfigurationRequest.newInstance(policy); FederationPolicyStoreInputValidator - .validateSetSubClusterPolicyConfigurationRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { Assert.assertTrue(e.getMessage().startsWith("Missing Queue.")); @@ -1222,7 +1222,7 @@ public void testSetSubClusterPolicyConfigurationRequest() { SetSubClusterPolicyConfigurationRequest request = SetSubClusterPolicyConfigurationRequest.newInstance(policy); FederationPolicyStoreInputValidator - .validateSetSubClusterPolicyConfigurationRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { Assert.assertTrue(e.getMessage().startsWith("Missing Queue.")); @@ -1236,7 +1236,7 @@ public void testSetSubClusterPolicyConfigurationRequest() { SetSubClusterPolicyConfigurationRequest request = SetSubClusterPolicyConfigurationRequest.newInstance(policy); FederationPolicyStoreInputValidator - .validateSetSubClusterPolicyConfigurationRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { Assert.assertTrue(e.getMessage().startsWith("Missing Policy Type.")); @@ -1250,7 +1250,7 @@ public void testSetSubClusterPolicyConfigurationRequest() { SetSubClusterPolicyConfigurationRequest request = SetSubClusterPolicyConfigurationRequest.newInstance(policy); FederationPolicyStoreInputValidator - .validateSetSubClusterPolicyConfigurationRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { Assert.assertTrue(e.getMessage().startsWith("Missing Policy Type.")); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacadeRetry.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacadeRetry.java index 632e865d9a..304910eb36 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacadeRetry.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacadeRetry.java @@ -24,7 +24,6 @@ import org.apache.hadoop.io.retry.RetryPolicy.RetryAction; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreErrorCode; import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreException; import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreInvalidInputException; import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreRetriableException; @@ -82,10 +81,8 @@ public void testFacadeStateStoreException() throws Exception { conf = new Configuration(); conf.setInt(YarnConfiguration.CLIENT_FAILOVER_RETRIES, maxRetries); RetryPolicy policy = FederationStateStoreFacade.createRetryPolicy(conf); - RetryAction action = policy.shouldRetry( - new FederationStateStoreException( - FederationStateStoreErrorCode.APPLICATIONS_INSERT_FAIL), - 0, 0, false); + RetryAction action = policy + .shouldRetry(new FederationStateStoreException("Error"), 0, 0, false); Assert.assertEquals(RetryAction.FAIL.action, action.action); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/resources/FederationStateStore/SQLServer/FederationStateStoreStoreProcs.sql b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/resources/FederationStateStore/SQLServer/FederationStateStoreStoreProcs.sql new file mode 100644 index 0000000000..66d6f0e203 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/resources/FederationStateStore/SQLServer/FederationStateStoreStoreProcs.sql @@ -0,0 +1,511 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +USE [FederationStateStore] +GO + +IF OBJECT_ID ( '[sp_addApplicationHomeSubCluster]', 'P' ) IS NOT NULL + DROP PROCEDURE [sp_addApplicationHomeSubCluster]; +GO + +CREATE PROCEDURE [dbo].[sp_addApplicationHomeSubCluster] + @applicationId VARCHAR(64), + @homeSubCluster VARCHAR(256), + @storedHomeSubCluster VARCHAR(256) OUTPUT, + @rowCount int OUTPUT +AS BEGIN + DECLARE @errorMessage nvarchar(4000) + + BEGIN TRY + BEGIN TRAN + -- If application to sub-cluster map doesn't exist, insert it. + -- Otherwise don't change the current mapping. + IF NOT EXISTS (SELECT TOP 1 * + FROM [dbo].[applicationsHomeSubCluster] + WHERE [applicationId] = @applicationId) + + INSERT INTO [dbo].[applicationsHomeSubCluster] ( + [applicationId], + [homeSubCluster]) + VALUES ( + @applicationId, + @homeSubCluster); + -- End of the IF block + + SELECT @rowCount = @@ROWCOUNT; + + SELECT @storedHomeSubCluster = [homeSubCluster] + FROM [dbo].[applicationsHomeSubCluster] + WHERE [applicationId] = @applicationId; + + COMMIT TRAN + END TRY + + BEGIN CATCH + ROLLBACK TRAN + + SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE()) + + /* raise error and terminate the execution */ + RAISERROR(@errorMessage, --- Error Message + 1, -- Severity + -1 -- State + ) WITH log + END CATCH +END; +GO + +IF OBJECT_ID ( '[sp_updateApplicationHomeSubCluster]', 'P' ) IS NOT NULL + DROP PROCEDURE [sp_updateApplicationHomeSubCluster]; +GO + +CREATE PROCEDURE [dbo].[sp_updateApplicationHomeSubCluster] + @applicationId VARCHAR(64), + @homeSubCluster VARCHAR(256), + @rowCount int OUTPUT +AS BEGIN + DECLARE @errorMessage nvarchar(4000) + + BEGIN TRY + BEGIN TRAN + + UPDATE [dbo].[applicationsHomeSubCluster] + SET [homeSubCluster] = @homeSubCluster + WHERE [applicationId] = @applicationid; + SELECT @rowCount = @@ROWCOUNT; + + COMMIT TRAN + END TRY + + BEGIN CATCH + ROLLBACK TRAN + + SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE()) + + /* raise error and terminate the execution */ + RAISERROR(@errorMessage, --- Error Message + 1, -- Severity + -1 -- State + ) WITH log + END CATCH +END; +GO + +IF OBJECT_ID ( '[sp_getApplicationsHomeSubCluster]', 'P' ) IS NOT NULL + DROP PROCEDURE [sp_getApplicationsHomeSubCluster]; +GO + +CREATE PROCEDURE [dbo].[sp_getApplicationsHomeSubCluster] +AS BEGIN + DECLARE @errorMessage nvarchar(4000) + + BEGIN TRY + SELECT [applicationId], [homeSubCluster], [createTime] + FROM [dbo].[applicationsHomeSubCluster] + END TRY + + BEGIN CATCH + SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE()) + + /* raise error and terminate the execution */ + RAISERROR(@errorMessage, --- Error Message + 1, -- Severity + -1 -- State + ) WITH log + END CATCH +END; +GO + +IF OBJECT_ID ( '[sp_getApplicationHomeSubCluster]', 'P' ) IS NOT NULL + DROP PROCEDURE [sp_getApplicationHomeSubCluster]; +GO + +CREATE PROCEDURE [dbo].[sp_getApplicationHomeSubCluster] + @applicationId VARCHAR(64), + @homeSubCluster VARCHAR(256) OUTPUT +AS BEGIN + DECLARE @errorMessage nvarchar(4000) + + BEGIN TRY + + SELECT @homeSubCluster = [homeSubCluster] + FROM [dbo].[applicationsHomeSubCluster] + WHERE [applicationId] = @applicationid; + + END TRY + + BEGIN CATCH + + SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE()) + + /* raise error and terminate the execution */ + RAISERROR(@errorMessage, --- Error Message + 1, -- Severity + -1 -- State + ) WITH log + END CATCH +END; +GO + +IF OBJECT_ID ( '[sp_deleteApplicationHomeSubCluster]', 'P' ) IS NOT NULL + DROP PROCEDURE [sp_deleteApplicationHomeSubCluster]; +GO + +CREATE PROCEDURE [dbo].[sp_deleteApplicationHomeSubCluster] + @applicationId VARCHAR(64), + @rowCount int OUTPUT +AS BEGIN + DECLARE @errorMessage nvarchar(4000) + + BEGIN TRY + BEGIN TRAN + + DELETE FROM [dbo].[applicationsHomeSubCluster] + WHERE [applicationId] = @applicationId; + SELECT @rowCount = @@ROWCOUNT; + + COMMIT TRAN + END TRY + + BEGIN CATCH + ROLLBACK TRAN + + SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE()) + + /* raise error and terminate the execution */ + RAISERROR(@errorMessage, --- Error Message + 1, -- Severity + -1 -- State + ) WITH log + END CATCH +END; +GO + +IF OBJECT_ID ( '[sp_registerSubCluster]', 'P' ) IS NOT NULL + DROP PROCEDURE [sp_registerSubCluster]; +GO + +CREATE PROCEDURE [dbo].[sp_registerSubCluster] + @subClusterId VARCHAR(256), + @amRMServiceAddress VARCHAR(256), + @clientRMServiceAddress VARCHAR(256), + @rmAdminServiceAddress VARCHAR(256), + @rmWebServiceAddress VARCHAR(256), + @state VARCHAR(32), + @lastStartTime BIGINT, + @capability VARCHAR(6000), + @rowCount int OUTPUT +AS BEGIN + DECLARE @errorMessage nvarchar(4000) + + BEGIN TRY + BEGIN TRAN + + DELETE FROM [dbo].[membership] + WHERE [subClusterId] = @subClusterId; + INSERT INTO [dbo].[membership] ( + [subClusterId], + [amRMServiceAddress], + [clientRMServiceAddress], + [rmAdminServiceAddress], + [rmWebServiceAddress], + [lastHeartBeat], + [state], + [lastStartTime], + [capability] ) + VALUES ( + @subClusterId, + @amRMServiceAddress, + @clientRMServiceAddress, + @rmAdminServiceAddress, + @rmWebServiceAddress, + GETUTCDATE(), + @state, + @lastStartTime, + @capability); + SELECT @rowCount = @@ROWCOUNT; + + COMMIT TRAN + END TRY + + BEGIN CATCH + ROLLBACK TRAN + + SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE()) + + /* raise error and terminate the execution */ + RAISERROR(@errorMessage, --- Error Message + 1, -- Severity + -1 -- State + ) WITH log + END CATCH +END; +GO + +IF OBJECT_ID ( '[sp_getSubClusters]', 'P' ) IS NOT NULL + DROP PROCEDURE [sp_getSubClusters]; +GO + +CREATE PROCEDURE [dbo].[sp_getSubClusters] +AS BEGIN + DECLARE @errorMessage nvarchar(4000) + + BEGIN TRY + SELECT [subClusterId], [amRMServiceAddress], [clientRMServiceAddress], + [rmAdminServiceAddress], [rmWebServiceAddress], [lastHeartBeat], + [state], [lastStartTime], [capability] + FROM [dbo].[membership] + END TRY + + BEGIN CATCH + SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE()) + + /* raise error and terminate the execution */ + RAISERROR(@errorMessage, --- Error Message + 1, -- Severity + -1 -- State + ) WITH log + END CATCH +END; +GO + +IF OBJECT_ID ( '[sp_getSubCluster]', 'P' ) IS NOT NULL + DROP PROCEDURE [sp_getSubCluster]; +GO + +CREATE PROCEDURE [dbo].[sp_getSubCluster] + @subClusterId VARCHAR(256), + @amRMServiceAddress VARCHAR(256) OUTPUT, + @clientRMServiceAddress VARCHAR(256) OUTPUT, + @rmAdminServiceAddress VARCHAR(256) OUTPUT, + @rmWebServiceAddress VARCHAR(256) OUTPUT, + @lastHeartbeat DATETIME2 OUTPUT, + @state VARCHAR(256) OUTPUT, + @lastStartTime BIGINT OUTPUT, + @capability VARCHAR(6000) OUTPUT +AS BEGIN + DECLARE @errorMessage nvarchar(4000) + + BEGIN TRY + BEGIN TRAN + + SELECT @subClusterId = [subClusterId], + @amRMServiceAddress = [amRMServiceAddress], + @clientRMServiceAddress = [clientRMServiceAddress], + @rmAdminServiceAddress = [rmAdminServiceAddress], + @rmWebServiceAddress = [rmWebServiceAddress], + @lastHeartBeat = [lastHeartBeat], + @state = [state], + @lastStartTime = [lastStartTime], + @capability = [capability] + FROM [dbo].[membership] + WHERE [subClusterId] = @subClusterId + + COMMIT TRAN + END TRY + + BEGIN CATCH + ROLLBACK TRAN + + SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE()) + + /* raise error and terminate the execution */ + RAISERROR(@errorMessage, --- Error Message + 1, -- Severity + -1 -- State + ) WITH log + END CATCH +END; +GO + + +IF OBJECT_ID ( '[sp_subClusterHeartbeat]', 'P' ) IS NOT NULL + DROP PROCEDURE [sp_subClusterHeartbeat]; +GO + +CREATE PROCEDURE [dbo].[sp_subClusterHeartbeat] + @subClusterId VARCHAR(256), + @state VARCHAR(256), + @capability VARCHAR(6000), + @rowCount int OUTPUT +AS BEGIN + DECLARE @errorMessage nvarchar(4000) + + BEGIN TRY + BEGIN TRAN + + UPDATE [dbo].[membership] + SET [state] = @state, + [lastHeartbeat] = GETUTCDATE(), + [capability] = @capability + WHERE [subClusterId] = @subClusterId; + SELECT @rowCount = @@ROWCOUNT; + + COMMIT TRAN + END TRY + + BEGIN CATCH + ROLLBACK TRAN + + SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE()) + + /* raise error and terminate the execution */ + RAISERROR(@errorMessage, --- Error Message + 1, -- Severity + -1 -- State + ) WITH log + END CATCH +END; +GO + +IF OBJECT_ID ( '[sp_deregisterSubCluster]', 'P' ) IS NOT NULL + DROP PROCEDURE [sp_deregisterSubCluster]; +GO + +CREATE PROCEDURE [dbo].[sp_deregisterSubCluster] + @subClusterId VARCHAR(256), + @state VARCHAR(256), + @rowCount int OUTPUT +AS BEGIN + DECLARE @errorMessage nvarchar(4000) + + BEGIN TRY + BEGIN TRAN + + UPDATE [dbo].[membership] + SET [state] = @state + WHERE [subClusterId] = @subClusterId; + SELECT @rowCount = @@ROWCOUNT; + + COMMIT TRAN + END TRY + + BEGIN CATCH + ROLLBACK TRAN + + SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE()) + + /* raise error and terminate the execution */ + RAISERROR(@errorMessage, --- Error Message + 1, -- Severity + -1 -- State + ) WITH log + END CATCH +END; +GO + +IF OBJECT_ID ( '[sp_setPolicyConfiguration]', 'P' ) IS NOT NULL + DROP PROCEDURE [sp_setPolicyConfiguration]; +GO + +CREATE PROCEDURE [dbo].[sp_setPolicyConfiguration] + @queue VARCHAR(256), + @policyType VARCHAR(256), + @params VARBINARY(512), + @rowCount int OUTPUT +AS BEGIN + DECLARE @errorMessage nvarchar(4000) + + BEGIN TRY + BEGIN TRAN + + DELETE FROM [dbo].[policies] + WHERE [queue] = @queue; + INSERT INTO [dbo].[policies] ( + [queue], + [policyType], + [params]) + VALUES ( + @queue, + @policyType, + @params); + SELECT @rowCount = @@ROWCOUNT; + + COMMIT TRAN + END TRY + + BEGIN CATCH + ROLLBACK TRAN + + SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE()) + + /* raise error and terminate the execution */ + RAISERROR(@errorMessage, --- Error Message + 1, -- Severity + -1 -- State + ) WITH log + END CATCH +END; +GO + +IF OBJECT_ID ( '[sp_getPolicyConfiguration]', 'P' ) IS NOT NULL + DROP PROCEDURE [sp_getPolicyConfiguration]; +GO + +CREATE PROCEDURE [dbo].[sp_getPolicyConfiguration] + @queue VARCHAR(256), + @policyType VARCHAR(256) OUTPUT, + @params VARBINARY(6000) OUTPUT +AS BEGIN + DECLARE @errorMessage nvarchar(4000) + + BEGIN TRY + + SELECT @policyType = [policyType], + @params = [params] + FROM [dbo].[policies] + WHERE [queue] = @queue + + END TRY + + BEGIN CATCH + + SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE()) + + /* raise error and terminate the execution */ + RAISERROR(@errorMessage, --- Error Message + 1, -- Severity + -1 -- State + ) WITH log + END CATCH +END; +GO + +IF OBJECT_ID ( '[sp_getPoliciesConfigurations]', 'P' ) IS NOT NULL + DROP PROCEDURE [sp_getPoliciesConfigurations]; +GO + +CREATE PROCEDURE [dbo].[sp_getPoliciesConfigurations] +AS BEGIN + DECLARE @errorMessage nvarchar(4000) + + BEGIN TRY + SELECT [queue], [policyType], [params] FROM [dbo].[policies] + END TRY + + BEGIN CATCH + SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE()) + + /* raise error and terminate the execution */ + RAISERROR(@errorMessage, --- Error Message + 1, -- Severity + -1 -- State + ) WITH log + END CATCH +END; +GO \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/resources/FederationStateStore/SQLServer/FederationStateStoreTables.sql b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/resources/FederationStateStore/SQLServer/FederationStateStoreTables.sql new file mode 100644 index 0000000000..a97385b496 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/resources/FederationStateStore/SQLServer/FederationStateStoreTables.sql @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +USE [FederationStateStore] +GO + +IF NOT EXISTS ( SELECT * FROM [FederationStateStore].sys.tables + WHERE name = 'applicationsHomeSubCluster' + AND schema_id = SCHEMA_ID('dbo')) + BEGIN + PRINT 'Table applicationsHomeSubCluster does not exist, create it...' + + SET ANSI_NULLS ON + + SET QUOTED_IDENTIFIER ON + + SET ANSI_PADDING ON + + CREATE TABLE [dbo].[applicationsHomeSubCluster]( + applicationId VARCHAR(64) COLLATE Latin1_General_100_BIN2 NOT NULL, + homeSubCluster VARCHAR(256) NOT NULL, + createTime DATETIME2 NOT NULL CONSTRAINT ts_createAppTime DEFAULT GETUTCDATE(), + + CONSTRAINT [pk_applicationId] PRIMARY KEY + ( + [applicationId] + ) + ) + + SET ANSI_PADDING OFF + + PRINT 'Table applicationsHomeSubCluster created.' + END +ELSE + PRINT 'Table applicationsHomeSubCluster exists, no operation required...' + GO +GO + +IF NOT EXISTS ( SELECT * FROM [FederationStateStore].sys.tables + WHERE name = 'membership' + AND schema_id = SCHEMA_ID('dbo')) + BEGIN + PRINT 'Table membership does not exist, create it...' + + SET ANSI_NULLS ON + + SET QUOTED_IDENTIFIER ON + + SET ANSI_PADDING ON + + CREATE TABLE [dbo].[membership]( + [subClusterId] VARCHAR(256) COLLATE Latin1_General_100_BIN2 NOT NULL, + [amRMServiceAddress] VARCHAR(256) NOT NULL, + [clientRMServiceAddress] VARCHAR(256) NOT NULL, + [rmAdminServiceAddress] VARCHAR(256) NOT NULL, + [rmWebServiceAddress] VARCHAR(256) NOT NULL, + [lastHeartBeat] DATETIME2 NOT NULL, + [state] VARCHAR(32) NOT NULL, + [lastStartTime] BIGINT NOT NULL, + [capability] VARCHAR(6000) NOT NULL, + + CONSTRAINT [pk_subClusterId] PRIMARY KEY + ( + [subClusterId] + ) + ) + + SET ANSI_PADDING OFF + + PRINT 'Table membership created.' + END +ELSE + PRINT 'Table membership exists, no operation required...' + GO +GO + +IF NOT EXISTS ( SELECT * FROM [FederationStateStore].sys.tables + WHERE name = 'policies' + AND schema_id = SCHEMA_ID('dbo')) + BEGIN + PRINT 'Table policies does not exist, create it...' + + SET ANSI_NULLS ON + + SET QUOTED_IDENTIFIER ON + + SET ANSI_PADDING ON + + CREATE TABLE [dbo].[policies]( + queue VARCHAR(256) COLLATE Latin1_General_100_BIN2 NOT NULL, + policyType VARCHAR(256) NOT NULL, + params VARBINARY(6000) NOT NULL, + + CONSTRAINT [pk_queue] PRIMARY KEY + ( + [queue] + ) + ) + + SET ANSI_PADDING OFF + + PRINT 'Table policies created.' + END +ELSE + PRINT 'Table policies exists, no operation required...' + GO +GO