- * Logical error codes from FederationStateStore
.
- *
@@ -61,4 +63,23 @@ public boolean isFinal() {
return (this == SC_UNREGISTERED || this == SC_DECOMMISSIONED
|| this == SC_LOST);
}
+
+ public static final Logger LOG =
+ LoggerFactory.getLogger(SubClusterState.class);
+
+ /**
+ * Convert a string into {@code SubClusterState}.
+ *
+ * @param x the string to convert in SubClusterState
+ * @return the respective {@code SubClusterState}
+ */
+ public static SubClusterState fromString(String x) {
+ try {
+ return SubClusterState.valueOf(x);
+ } catch (Exception e) {
+ LOG.error("Invalid SubCluster State value in the StateStore does not"
+ + " match with the YARN Federation standard.");
+ return null;
+ }
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationApplicationHomeSubClusterStoreInputValidator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationApplicationHomeSubClusterStoreInputValidator.java
index d920144e7a..0184c9fad6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationApplicationHomeSubClusterStoreInputValidator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationApplicationHomeSubClusterStoreInputValidator.java
@@ -51,8 +51,7 @@ private FederationApplicationHomeSubClusterStoreInputValidator() {
* against
* @throws FederationStateStoreInvalidInputException if the request is invalid
*/
- public static void validateAddApplicationHomeSubClusterRequest(
- AddApplicationHomeSubClusterRequest request)
+ public static void validate(AddApplicationHomeSubClusterRequest request)
throws FederationStateStoreInvalidInputException {
if (request == null) {
String message = "Missing AddApplicationHomeSubCluster Request."
@@ -75,8 +74,7 @@ public static void validateAddApplicationHomeSubClusterRequest(
* validate against
* @throws FederationStateStoreInvalidInputException if the request is invalid
*/
- public static void validateUpdateApplicationHomeSubClusterRequest(
- UpdateApplicationHomeSubClusterRequest request)
+ public static void validate(UpdateApplicationHomeSubClusterRequest request)
throws FederationStateStoreInvalidInputException {
if (request == null) {
String message = "Missing UpdateApplicationHomeSubCluster Request."
@@ -99,8 +97,7 @@ public static void validateUpdateApplicationHomeSubClusterRequest(
* against
* @throws FederationStateStoreInvalidInputException if the request is invalid
*/
- public static void validateGetApplicationHomeSubClusterRequest(
- GetApplicationHomeSubClusterRequest request)
+ public static void validate(GetApplicationHomeSubClusterRequest request)
throws FederationStateStoreInvalidInputException {
if (request == null) {
String message = "Missing GetApplicationHomeSubCluster Request."
@@ -122,8 +119,7 @@ public static void validateGetApplicationHomeSubClusterRequest(
* validate against
* @throws FederationStateStoreInvalidInputException if the request is invalid
*/
- public static void validateDeleteApplicationHomeSubClusterRequest(
- DeleteApplicationHomeSubClusterRequest request)
+ public static void validate(DeleteApplicationHomeSubClusterRequest request)
throws FederationStateStoreInvalidInputException {
if (request == null) {
String message = "Missing DeleteApplicationHomeSubCluster Request."
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationMembershipStateStoreInputValidator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationMembershipStateStoreInputValidator.java
index ebe622b96b..0ec8e5de6f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationMembershipStateStoreInputValidator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationMembershipStateStoreInputValidator.java
@@ -53,8 +53,7 @@ private FederationMembershipStateStoreInputValidator() {
* @param request the {@link SubClusterRegisterRequest} to validate against
* @throws FederationStateStoreInvalidInputException if the request is invalid
*/
- public static void validateSubClusterRegisterRequest(
- SubClusterRegisterRequest request)
+ public static void validate(SubClusterRegisterRequest request)
throws FederationStateStoreInvalidInputException {
// check if the request is present
@@ -79,8 +78,7 @@ public static void validateSubClusterRegisterRequest(
* @param request the {@link SubClusterDeregisterRequest} to validate against
* @throws FederationStateStoreInvalidInputException if the request is invalid
*/
- public static void validateSubClusterDeregisterRequest(
- SubClusterDeregisterRequest request)
+ public static void validate(SubClusterDeregisterRequest request)
throws FederationStateStoreInvalidInputException {
// check if the request is present
@@ -111,8 +109,7 @@ public static void validateSubClusterDeregisterRequest(
* @param request the {@link SubClusterHeartbeatRequest} to validate against
* @throws FederationStateStoreInvalidInputException if the request is invalid
*/
- public static void validateSubClusterHeartbeatRequest(
- SubClusterHeartbeatRequest request)
+ public static void validate(SubClusterHeartbeatRequest request)
throws FederationStateStoreInvalidInputException {
// check if the request is present
@@ -143,8 +140,7 @@ public static void validateSubClusterHeartbeatRequest(
* @param request the {@link GetSubClusterInfoRequest} to validate against
* @throws FederationStateStoreInvalidInputException if the request is invalid
*/
- public static void validateGetSubClusterInfoRequest(
- GetSubClusterInfoRequest request)
+ public static void validate(GetSubClusterInfoRequest request)
throws FederationStateStoreInvalidInputException {
// check if the request is present
@@ -169,7 +165,7 @@ public static void validateGetSubClusterInfoRequest(
* @throws FederationStateStoreInvalidInputException if the SubCluster Info
* are invalid
*/
- private static void checkSubClusterInfo(SubClusterInfo subClusterInfo)
+ public static void checkSubClusterInfo(SubClusterInfo subClusterInfo)
throws FederationStateStoreInvalidInputException {
if (subClusterInfo == null) {
String message = "Missing SubCluster Information."
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationPolicyStoreInputValidator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationPolicyStoreInputValidator.java
index 0df2d85856..3c68bfdace 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationPolicyStoreInputValidator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationPolicyStoreInputValidator.java
@@ -48,8 +48,7 @@ private FederationPolicyStoreInputValidator() {
* validate against
* @throws FederationStateStoreInvalidInputException if the request is invalid
*/
- public static void validateGetSubClusterPolicyConfigurationRequest(
- GetSubClusterPolicyConfigurationRequest request)
+ public static void validate(GetSubClusterPolicyConfigurationRequest request)
throws FederationStateStoreInvalidInputException {
if (request == null) {
String message = "Missing GetSubClusterPolicyConfiguration Request."
@@ -72,8 +71,7 @@ public static void validateGetSubClusterPolicyConfigurationRequest(
* validate against
* @throws FederationStateStoreInvalidInputException if the request is invalid
*/
- public static void validateSetSubClusterPolicyConfigurationRequest(
- SetSubClusterPolicyConfigurationRequest request)
+ public static void validate(SetSubClusterPolicyConfigurationRequest request)
throws FederationStateStoreInvalidInputException {
if (request == null) {
String message = "Missing SetSubClusterPolicyConfiguration Request."
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreUtils.java
index 7dbb20aaf0..3b870debef 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreUtils.java
@@ -20,16 +20,18 @@
import java.sql.CallableStatement;
import java.sql.Connection;
+import java.sql.ResultSet;
import java.sql.SQLException;
import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreErrorCode;
import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreException;
import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreInvalidInputException;
import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreRetriableException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.zaxxer.hikari.HikariDataSource;
+
/**
* Common utility methods used by the store implementations.
*
@@ -39,19 +41,22 @@ public final class FederationStateStoreUtils {
public static final Logger LOG =
LoggerFactory.getLogger(FederationStateStoreUtils.class);
+ public final static String FEDERATION_STORE_URL = "url";
+
private FederationStateStoreUtils() {
}
/**
- * Returns the SQL FederationStateStore
connection to the pool.
+ * Returns the SQL FederationStateStore
connections to the pool.
*
* @param log the logger interface
* @param cstmt the interface used to execute SQL stored procedures
* @param conn the SQL connection
+ * @param rs the ResultSet interface used to execute SQL stored procedures
* @throws YarnException on failure
*/
public static void returnToPool(Logger log, CallableStatement cstmt,
- Connection conn) throws YarnException {
+ Connection conn, ResultSet rs) throws YarnException {
if (cstmt != null) {
try {
cstmt.close();
@@ -69,6 +74,28 @@ public static void returnToPool(Logger log, CallableStatement cstmt,
e);
}
}
+
+ if (rs != null) {
+ try {
+ rs.close();
+ } catch (SQLException e) {
+ logAndThrowException(log, "Exception while trying to close ResultSet",
+ e);
+ }
+ }
+ }
+
+ /**
+ * Returns the SQL FederationStateStore
connections to the pool.
+ *
+ * @param log the logger interface
+ * @param cstmt the interface used to execute SQL stored procedures
+ * @param conn the SQL connection
+ * @throws YarnException on failure
+ */
+ public static void returnToPool(Logger log, CallableStatement cstmt,
+ Connection conn) throws YarnException {
+ returnToPool(log, cstmt, conn, null);
}
/**
@@ -95,28 +122,13 @@ public static void logAndThrowException(Logger log, String errMsg,
* FederationStateStore
.
*
* @param log the logger interface
- * @param code FederationStateStoreErrorCode of the error
* @param errMsg the error message
* @throws YarnException on failure
*/
- public static void logAndThrowStoreException(Logger log,
- FederationStateStoreErrorCode code, String errMsg) throws YarnException {
- log.error(errMsg + " " + code.toString());
- throw new FederationStateStoreException(code);
- }
-
- /**
- * Throws an FederationStateStoreException
due to an error in
- * FederationStateStore
.
- *
- * @param log the logger interface
- * @param code FederationStateStoreErrorCode of the error
- * @throws YarnException on failure
- */
- public static void logAndThrowStoreException(Logger log,
- FederationStateStoreErrorCode code) throws YarnException {
- log.error(code.toString());
- throw new FederationStateStoreException(code);
+ public static void logAndThrowStoreException(Logger log, String errMsg)
+ throws YarnException {
+ log.error(errMsg);
+ throw new FederationStateStoreException(errMsg);
}
/**
@@ -129,7 +141,7 @@ public static void logAndThrowStoreException(Logger log,
*/
public static void logAndThrowInvalidInputException(Logger log, String errMsg)
throws YarnException {
- LOG.error(errMsg);
+ log.error(errMsg);
throw new FederationStateStoreInvalidInputException(errMsg);
}
@@ -145,11 +157,58 @@ public static void logAndThrowInvalidInputException(Logger log, String errMsg)
public static void logAndThrowRetriableException(Logger log, String errMsg,
Throwable t) throws YarnException {
if (t != null) {
- LOG.error(errMsg, t);
+ log.error(errMsg, t);
throw new FederationStateStoreRetriableException(errMsg, t);
} else {
- LOG.error(errMsg);
+ log.error(errMsg);
throw new FederationStateStoreRetriableException(errMsg);
}
}
+
+ /**
+ * Sets a specific value for a specific property of
+ * HikariDataSource
SQL connections.
+ *
+ * @param dataSource the HikariDataSource
connections
+ * @param property the property to set
+ * @param value the value to set
+ */
+ public static void setProperty(HikariDataSource dataSource, String property,
+ String value) {
+ LOG.debug("Setting property {} with value {}", property, value);
+ if (property != null && !property.isEmpty() && value != null) {
+ dataSource.addDataSourceProperty(property, value);
+ }
+ }
+
+ /**
+ * Sets a specific username for HikariDataSource
SQL connections.
+ *
+ * @param dataSource the HikariDataSource
connections
+ * @param userNameDB the value to set
+ */
+ public static void setUsername(HikariDataSource dataSource,
+ String userNameDB) {
+ if (userNameDB != null) {
+ dataSource.setUsername(userNameDB);
+ LOG.debug("Setting non NULL Username for Store connection");
+ } else {
+ LOG.debug("NULL Username specified for Store connection, so ignoring");
+ }
+ }
+
+ /**
+ * Sets a specific password for HikariDataSource
SQL connections.
+ *
+ * @param dataSource the HikariDataSource
connections
+ * @param password the value to set
+ */
+ public static void setPassword(HikariDataSource dataSource, String password) {
+ if (password != null) {
+ dataSource.setPassword(password);
+ LOG.debug("Setting non NULL Credentials for Store connection");
+ } else {
+ LOG.debug("NULL Credentials specified for Store connection, so ignoring");
+ }
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java
index 80b00ef79d..db04592e3a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java
@@ -19,13 +19,14 @@
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Calendar;
import java.util.List;
+import java.util.TimeZone;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
-import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreErrorCode;
import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreException;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse;
@@ -87,13 +88,26 @@ public void after() throws Exception {
@Test
public void testRegisterSubCluster() throws Exception {
SubClusterId subClusterId = SubClusterId.newInstance("SC");
+
SubClusterInfo subClusterInfo = createSubClusterInfo(subClusterId);
+ long previousTimeStamp =
+ Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis();
+
SubClusterRegisterResponse result = stateStore.registerSubCluster(
SubClusterRegisterRequest.newInstance(subClusterInfo));
+ long currentTimeStamp =
+ Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis();
+
Assert.assertNotNull(result);
Assert.assertEquals(subClusterInfo, querySubClusterInfo(subClusterId));
+
+ // The saved heartbeat is between the old one and the current timestamp
+ Assert.assertTrue(querySubClusterInfo(subClusterId)
+ .getLastHeartBeat() <= currentTimeStamp);
+ Assert.assertTrue(querySubClusterInfo(subClusterId)
+ .getLastHeartBeat() >= previousTimeStamp);
}
@Test
@@ -120,9 +134,7 @@ public void testDeregisterSubClusterUnknownSubCluster() throws Exception {
stateStore.deregisterSubCluster(deregisterRequest);
Assert.fail();
} catch (FederationStateStoreException e) {
- Assert.assertEquals(
- FederationStateStoreErrorCode.MEMBERSHIP_UPDATE_DEREGISTER_FAIL,
- e.getCode());
+ Assert.assertTrue(e.getMessage().startsWith("SubCluster SC not found"));
}
}
@@ -149,9 +161,8 @@ public void testGetSubClusterInfoUnknownSubCluster() throws Exception {
stateStore.getSubCluster(request).getSubClusterInfo();
Assert.fail();
} catch (FederationStateStoreException e) {
- Assert.assertEquals(
- FederationStateStoreErrorCode.MEMBERSHIP_SINGLE_SELECT_FAIL,
- e.getCode());
+ Assert.assertTrue(
+ e.getMessage().startsWith("SubCluster SC does not exist"));
}
}
@@ -200,13 +211,24 @@ public void testSubClusterHeartbeat() throws Exception {
SubClusterId subClusterId = SubClusterId.newInstance("SC");
registerSubCluster(createSubClusterInfo(subClusterId));
+ long previousHeartBeat =
+ querySubClusterInfo(subClusterId).getLastHeartBeat();
+
SubClusterHeartbeatRequest heartbeatRequest = SubClusterHeartbeatRequest
.newInstance(subClusterId, SubClusterState.SC_RUNNING, "capability");
stateStore.subClusterHeartbeat(heartbeatRequest);
+ long currentTimeStamp =
+ Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis();
+
Assert.assertEquals(SubClusterState.SC_RUNNING,
querySubClusterInfo(subClusterId).getState());
- Assert.assertNotNull(querySubClusterInfo(subClusterId).getLastHeartBeat());
+
+ // The saved heartbeat is between the old one and the current timestamp
+ Assert.assertTrue(querySubClusterInfo(subClusterId)
+ .getLastHeartBeat() <= currentTimeStamp);
+ Assert.assertTrue(querySubClusterInfo(subClusterId)
+ .getLastHeartBeat() >= previousHeartBeat);
}
@Test
@@ -219,9 +241,8 @@ public void testSubClusterHeartbeatUnknownSubCluster() throws Exception {
stateStore.subClusterHeartbeat(heartbeatRequest);
Assert.fail();
} catch (FederationStateStoreException e) {
- Assert.assertEquals(
- FederationStateStoreErrorCode.MEMBERSHIP_UPDATE_HEARTBEAT_FAIL,
- e.getCode());
+ Assert.assertTrue(e.getMessage()
+ .startsWith("SubCluster SC does not exist; cannot heartbeat"));
}
}
@@ -281,9 +302,8 @@ public void testDeleteApplicationHomeSubCluster() throws Exception {
queryApplicationHomeSC(appId);
Assert.fail();
} catch (FederationStateStoreException e) {
- Assert.assertEquals(
- FederationStateStoreErrorCode.APPLICATIONS_SINGLE_SELECT_FAIL,
- e.getCode());
+ Assert.assertTrue(e.getMessage()
+ .startsWith("Application " + appId + " does not exist"));
}
}
@@ -298,8 +318,8 @@ public void testDeleteApplicationHomeSubClusterUnknownApp() throws Exception {
stateStore.deleteApplicationHomeSubCluster(delRequest);
Assert.fail();
} catch (FederationStateStoreException e) {
- Assert.assertEquals(
- FederationStateStoreErrorCode.APPLICATIONS_DELETE_FAIL, e.getCode());
+ Assert.assertTrue(e.getMessage()
+ .startsWith("Application " + appId.toString() + " does not exist"));
}
}
@@ -331,9 +351,8 @@ public void testGetApplicationHomeSubClusterUnknownApp() throws Exception {
stateStore.getApplicationHomeSubCluster(request);
Assert.fail();
} catch (FederationStateStoreException e) {
- Assert.assertEquals(
- FederationStateStoreErrorCode.APPLICATIONS_SINGLE_SELECT_FAIL,
- e.getCode());
+ Assert.assertTrue(e.getMessage()
+ .startsWith("Application " + appId.toString() + " does not exist"));
}
}
@@ -397,8 +416,8 @@ public void testUpdateApplicationHomeSubClusterUnknownApp() throws Exception {
stateStore.updateApplicationHomeSubCluster((updateRequest));
Assert.fail();
} catch (FederationStateStoreException e) {
- Assert.assertEquals(
- FederationStateStoreErrorCode.APPLICATIONS_UPDATE_FAIL, e.getCode());
+ Assert.assertTrue(e.getMessage()
+ .startsWith("Application " + appId.toString() + " does not exist"));
}
}
@@ -458,8 +477,8 @@ public void testGetPolicyConfigurationUnknownQueue() throws Exception {
stateStore.getPolicyConfiguration(request);
Assert.fail();
} catch (FederationStateStoreException e) {
- Assert.assertEquals(
- FederationStateStoreErrorCode.POLICY_SINGLE_SELECT_FAIL, e.getCode());
+ Assert.assertTrue(
+ e.getMessage().startsWith("Policy for queue Queue does not exist"));
}
}
@@ -499,8 +518,9 @@ private SubClusterInfo createSubClusterInfo(SubClusterId subClusterId) {
private SubClusterPolicyConfiguration createSCPolicyConf(String queueName,
String policyType) {
- return SubClusterPolicyConfiguration.newInstance(queueName, policyType,
- ByteBuffer.allocate(1));
+ ByteBuffer bb = ByteBuffer.allocate(100);
+ bb.put((byte) 0x02);
+ return SubClusterPolicyConfiguration.newInstance(queueName, policyType, bb);
}
private void addApplicationHomeSC(ApplicationId appId,
@@ -558,4 +578,8 @@ protected void setConf(Configuration conf) {
this.conf = conf;
}
+ protected Configuration getConf() {
+ return conf;
+ }
+
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/HSQLDBFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/HSQLDBFederationStateStore.java
new file mode 100644
index 0000000000..289a3a6112
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/HSQLDBFederationStateStore.java
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.store.impl;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * HSQLDB implementation of {@link FederationStateStore}.
+ */
+public class HSQLDBFederationStateStore extends SQLFederationStateStore {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(HSQLDBFederationStateStore.class);
+
+ private Connection conn;
+
+ private static final String TABLE_APPLICATIONSHOMESUBCLUSTER =
+ " CREATE TABLE applicationsHomeSubCluster ("
+ + " applicationId varchar(64) NOT NULL,"
+ + " homeSubCluster varchar(256) NOT NULL,"
+ + " CONSTRAINT pk_applicationId PRIMARY KEY (applicationId))";
+
+ private static final String TABLE_MEMBERSHIP =
+ "CREATE TABLE membership ( subClusterId varchar(256) NOT NULL,"
+ + " amRMServiceAddress varchar(256) NOT NULL,"
+ + " clientRMServiceAddress varchar(256) NOT NULL,"
+ + " rmAdminServiceAddress varchar(256) NOT NULL,"
+ + " rmWebServiceAddress varchar(256) NOT NULL,"
+ + " lastHeartBeat datetime NOT NULL, state varchar(32) NOT NULL,"
+ + " lastStartTime bigint NULL, capability varchar(6000) NOT NULL,"
+ + " CONSTRAINT pk_subClusterId PRIMARY KEY (subClusterId))";
+
+ private static final String TABLE_POLICIES =
+ "CREATE TABLE policies ( queue varchar(256) NOT NULL,"
+ + " policyType varchar(256) NOT NULL, params varbinary(512),"
+ + " CONSTRAINT pk_queue PRIMARY KEY (queue))";
+
+ private static final String SP_REGISTERSUBCLUSTER =
+ "CREATE PROCEDURE sp_registerSubCluster("
+ + " IN subClusterId_IN varchar(256),"
+ + " IN amRMServiceAddress_IN varchar(256),"
+ + " IN clientRMServiceAddress_IN varchar(256),"
+ + " IN rmAdminServiceAddress_IN varchar(256),"
+ + " IN rmWebServiceAddress_IN varchar(256),"
+ + " IN state_IN varchar(256),"
+ + " IN lastStartTime_IN bigint, IN capability_IN varchar(6000),"
+ + " OUT rowCount_OUT int)MODIFIES SQL DATA BEGIN ATOMIC"
+ + " DELETE FROM membership WHERE (subClusterId = subClusterId_IN);"
+ + " INSERT INTO membership ( subClusterId,"
+ + " amRMServiceAddress, clientRMServiceAddress,"
+ + " rmAdminServiceAddress, rmWebServiceAddress,"
+ + " lastHeartBeat, state, lastStartTime,"
+ + " capability) VALUES ( subClusterId_IN,"
+ + " amRMServiceAddress_IN, clientRMServiceAddress_IN,"
+ + " rmAdminServiceAddress_IN, rmWebServiceAddress_IN,"
+ + " NOW() AT TIME ZONE INTERVAL '0:00' HOUR TO MINUTE,"
+ + " state_IN, lastStartTime_IN, capability_IN);"
+ + " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT; END";
+
+ private static final String SP_DEREGISTERSUBCLUSTER =
+ "CREATE PROCEDURE sp_deregisterSubCluster("
+ + " IN subClusterId_IN varchar(256),"
+ + " IN state_IN varchar(64), OUT rowCount_OUT int)"
+ + " MODIFIES SQL DATA BEGIN ATOMIC"
+ + " UPDATE membership SET state = state_IN WHERE ("
+ + " subClusterId = subClusterId_IN AND state != state_IN);"
+ + " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT; END";
+
+ private static final String SP_SUBCLUSTERHEARTBEAT =
+ "CREATE PROCEDURE sp_subClusterHeartbeat("
+ + " IN subClusterId_IN varchar(256), IN state_IN varchar(64),"
+ + " IN capability_IN varchar(6000), OUT rowCount_OUT int)"
+ + " MODIFIES SQL DATA BEGIN ATOMIC UPDATE membership"
+ + " SET capability = capability_IN, state = state_IN,"
+ + " lastHeartBeat = NOW() AT TIME ZONE INTERVAL '0:00'"
+ + " HOUR TO MINUTE WHERE subClusterId = subClusterId_IN;"
+ + " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT; END";
+
+ private static final String SP_GETSUBCLUSTER =
+ "CREATE PROCEDURE sp_getSubCluster( IN subClusterId_IN varchar(256),"
+ + " OUT amRMServiceAddress_OUT varchar(256),"
+ + " OUT clientRMServiceAddress_OUT varchar(256),"
+ + " OUT rmAdminServiceAddress_OUT varchar(256),"
+ + " OUT rmWebServiceAddress_OUT varchar(256),"
+ + " OUT lastHeartBeat_OUT datetime, OUT state_OUT varchar(64),"
+ + " OUT lastStartTime_OUT bigint,"
+ + " OUT capability_OUT varchar(6000))"
+ + " MODIFIES SQL DATA BEGIN ATOMIC SELECT amRMServiceAddress,"
+ + " clientRMServiceAddress,"
+ + " rmAdminServiceAddress, rmWebServiceAddress,"
+ + " lastHeartBeat, state, lastStartTime, capability"
+ + " INTO amRMServiceAddress_OUT, clientRMServiceAddress_OUT,"
+ + " rmAdminServiceAddress_OUT,"
+ + " rmWebServiceAddress_OUT, lastHeartBeat_OUT,"
+ + " state_OUT, lastStartTime_OUT, capability_OUT"
+ + " FROM membership WHERE subClusterId = subClusterId_IN; END";
+
+ private static final String SP_GETSUBCLUSTERS =
+ "CREATE PROCEDURE sp_getSubClusters()"
+ + " MODIFIES SQL DATA DYNAMIC RESULT SETS 1 BEGIN ATOMIC"
+ + " DECLARE result CURSOR FOR"
+ + " SELECT subClusterId, amRMServiceAddress, clientRMServiceAddress,"
+ + " rmAdminServiceAddress, rmWebServiceAddress, lastHeartBeat,"
+ + " state, lastStartTime, capability"
+ + " FROM membership; OPEN result; END";
+
+ private static final String SP_ADDAPPLICATIONHOMESUBCLUSTER =
+ "CREATE PROCEDURE sp_addApplicationHomeSubCluster("
+ + " IN applicationId_IN varchar(64),"
+ + " IN homeSubCluster_IN varchar(256),"
+ + " OUT storedHomeSubCluster_OUT varchar(256), OUT rowCount_OUT int)"
+ + " MODIFIES SQL DATA BEGIN ATOMIC"
+ + " INSERT INTO applicationsHomeSubCluster "
+ + " (applicationId,homeSubCluster) "
+ + " (SELECT applicationId_IN, homeSubCluster_IN"
+ + " FROM applicationsHomeSubCluster"
+ + " WHERE applicationId = applicationId_IN"
+ + " HAVING COUNT(*) = 0 );"
+ + " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT;"
+ + " SELECT homeSubCluster INTO storedHomeSubCluster_OUT"
+ + " FROM applicationsHomeSubCluster"
+ + " WHERE applicationId = applicationID_IN; END";
+
+ private static final String SP_UPDATEAPPLICATIONHOMESUBCLUSTER =
+ "CREATE PROCEDURE sp_updateApplicationHomeSubCluster("
+ + " IN applicationId_IN varchar(64),"
+ + " IN homeSubCluster_IN varchar(256), OUT rowCount_OUT int)"
+ + " MODIFIES SQL DATA BEGIN ATOMIC"
+ + " UPDATE applicationsHomeSubCluster"
+ + " SET homeSubCluster = homeSubCluster_IN"
+ + " WHERE applicationId = applicationId_IN;"
+ + " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT; END";
+
+ private static final String SP_GETAPPLICATIONHOMESUBCLUSTER =
+ "CREATE PROCEDURE sp_getApplicationHomeSubCluster("
+ + " IN applicationId_IN varchar(64),"
+ + " OUT homeSubCluster_OUT varchar(256))"
+ + " MODIFIES SQL DATA BEGIN ATOMIC"
+ + " SELECT homeSubCluster INTO homeSubCluster_OUT"
+ + " FROM applicationsHomeSubCluster"
+ + " WHERE applicationId = applicationID_IN; END";
+
+ private static final String SP_GETAPPLICATIONSHOMESUBCLUSTER =
+ "CREATE PROCEDURE sp_getApplicationsHomeSubCluster()"
+ + " MODIFIES SQL DATA DYNAMIC RESULT SETS 1 BEGIN ATOMIC"
+ + " DECLARE result CURSOR FOR"
+ + " SELECT applicationId, homeSubCluster"
+ + " FROM applicationsHomeSubCluster; OPEN result; END";
+
+ private static final String SP_DELETEAPPLICATIONHOMESUBCLUSTER =
+ "CREATE PROCEDURE sp_deleteApplicationHomeSubCluster("
+ + " IN applicationId_IN varchar(64), OUT rowCount_OUT int)"
+ + " MODIFIES SQL DATA BEGIN ATOMIC"
+ + " DELETE FROM applicationsHomeSubCluster"
+ + " WHERE applicationId = applicationId_IN;"
+ + " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT; END";
+
+ private static final String SP_SETPOLICYCONFIGURATION =
+ "CREATE PROCEDURE sp_setPolicyConfiguration("
+ + " IN queue_IN varchar(256), IN policyType_IN varchar(256),"
+ + " IN params_IN varbinary(512), OUT rowCount_OUT int)"
+ + " MODIFIES SQL DATA BEGIN ATOMIC"
+ + " DELETE FROM policies WHERE queue = queue_IN;"
+ + " INSERT INTO policies (queue, policyType, params)"
+ + " VALUES (queue_IN, policyType_IN, params_IN);"
+ + " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT; END";
+
+ private static final String SP_GETPOLICYCONFIGURATION =
+ "CREATE PROCEDURE sp_getPolicyConfiguration("
+ + " IN queue_IN varchar(256), OUT policyType_OUT varchar(256),"
+ + " OUT params_OUT varbinary(512)) MODIFIES SQL DATA BEGIN ATOMIC"
+ + " SELECT policyType, params INTO policyType_OUT, params_OUT"
+ + " FROM policies WHERE queue = queue_IN; END";
+
+ private static final String SP_GETPOLICIESCONFIGURATIONS =
+ "CREATE PROCEDURE sp_getPoliciesConfigurations()"
+ + " MODIFIES SQL DATA DYNAMIC RESULT SETS 1 BEGIN ATOMIC"
+ + " DECLARE result CURSOR FOR"
+ + " SELECT * FROM policies; OPEN result; END";
+
+ @Override
+ public void init(Configuration conf) {
+ try {
+ super.init(conf);
+ } catch (YarnException e1) {
+ LOG.error("ERROR: failed to init HSQLDB " + e1.getMessage());
+ }
+ try {
+ conn = getConnection();
+
+ LOG.info("Database Init: Start");
+
+ conn.prepareStatement(TABLE_APPLICATIONSHOMESUBCLUSTER).execute();
+ conn.prepareStatement(TABLE_MEMBERSHIP).execute();
+ conn.prepareStatement(TABLE_POLICIES).execute();
+
+ conn.prepareStatement(SP_REGISTERSUBCLUSTER).execute();
+ conn.prepareStatement(SP_DEREGISTERSUBCLUSTER).execute();
+ conn.prepareStatement(SP_SUBCLUSTERHEARTBEAT).execute();
+ conn.prepareStatement(SP_GETSUBCLUSTER).execute();
+ conn.prepareStatement(SP_GETSUBCLUSTERS).execute();
+
+ conn.prepareStatement(SP_ADDAPPLICATIONHOMESUBCLUSTER).execute();
+ conn.prepareStatement(SP_UPDATEAPPLICATIONHOMESUBCLUSTER).execute();
+ conn.prepareStatement(SP_GETAPPLICATIONHOMESUBCLUSTER).execute();
+ conn.prepareStatement(SP_GETAPPLICATIONSHOMESUBCLUSTER).execute();
+ conn.prepareStatement(SP_DELETEAPPLICATIONHOMESUBCLUSTER).execute();
+
+ conn.prepareStatement(SP_SETPOLICYCONFIGURATION).execute();
+ conn.prepareStatement(SP_GETPOLICYCONFIGURATION).execute();
+ conn.prepareStatement(SP_GETPOLICIESCONFIGURATIONS).execute();
+
+ LOG.info("Database Init: Complete");
+ conn.close();
+ } catch (SQLException e) {
+ LOG.error("ERROR: failed to inizialize HSQLDB " + e.getMessage());
+ }
+ }
+
+ public void closeConnection() {
+ try {
+ conn.close();
+ } catch (SQLException e) {
+ LOG.error(
+ "ERROR: failed to close connection to HSQLDB DB " + e.getMessage());
+ }
+ }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java
index 64adab8394..c29fc03c5b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java
@@ -28,7 +28,8 @@ public class TestMemoryFederationStateStore
@Override
protected FederationStateStore createStateStore() {
- super.setConf(new Configuration());
+ Configuration conf = new Configuration();
+ super.setConf(conf);
return new MemoryFederationStateStore();
}
}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestSQLFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestSQLFederationStateStore.java
new file mode 100644
index 0000000000..d4e6cc53f6
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestSQLFederationStateStore.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0 + *
+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.federation.store.impl; + +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; + +/** + * Unit tests for SQLFederationStateStore. + */ +public class TestSQLFederationStateStore extends FederationStateStoreBaseTest { + + private static final String HSQLDB_DRIVER = "org.hsqldb.jdbc.JDBCDataSource"; + private static final String DATABASE_URL = "jdbc:hsqldb:mem:state"; + private static final String DATABASE_USERNAME = "SA"; + private static final String DATABASE_PASSWORD = ""; + + @Override + protected FederationStateStore createStateStore() { + + YarnConfiguration conf = new YarnConfiguration(); + + conf.set(YarnConfiguration.FEDERATION_STATESTORE_SQL_JDBC_CLASS, + HSQLDB_DRIVER); + conf.set(YarnConfiguration.FEDERATION_STATESTORE_SQL_USERNAME, + DATABASE_USERNAME); + conf.set(YarnConfiguration.FEDERATION_STATESTORE_SQL_PASSWORD, + DATABASE_PASSWORD); + conf.set(YarnConfiguration.FEDERATION_STATESTORE_SQL_URL, + DATABASE_URL + System.currentTimeMillis()); + super.setConf(conf); + return new HSQLDBFederationStateStore(); + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/utils/TestFederationStateStoreInputValidator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/utils/TestFederationStateStoreInputValidator.java index 8ac5e81e27..5a5703e6f6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/utils/TestFederationStateStoreInputValidator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/utils/TestFederationStateStoreInputValidator.java @@ -145,7 +145,7 @@ public void testValidateSubClusterRegisterRequest() { SubClusterRegisterRequest request = SubClusterRegisterRequest.newInstance(subClusterInfo); FederationMembershipStateStoreInputValidator - .validateSubClusterRegisterRequest(request); + .validate(request); } catch (FederationStateStoreInvalidInputException e) { Assert.fail(e.getMessage()); } @@ -155,7 +155,7 @@ public void testValidateSubClusterRegisterRequest() { try { SubClusterRegisterRequest request = null; FederationMembershipStateStoreInputValidator - .validateSubClusterRegisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -170,7 +170,7 @@ public void testValidateSubClusterRegisterRequest() { SubClusterRegisterRequest request = SubClusterRegisterRequest.newInstance(subClusterInfo); FederationMembershipStateStoreInputValidator - .validateSubClusterRegisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -188,7 +188,7 @@ public void testValidateSubClusterRegisterRequest() { SubClusterRegisterRequest request = SubClusterRegisterRequest.newInstance(subClusterInfo); FederationMembershipStateStoreInputValidator - .validateSubClusterRegisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -206,7 +206,7 @@ public void testValidateSubClusterRegisterRequest() { SubClusterRegisterRequest request = SubClusterRegisterRequest.newInstance(subClusterInfo); FederationMembershipStateStoreInputValidator - .validateSubClusterRegisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -224,7 +224,7 @@ public void testValidateSubClusterRegisterRequest() { SubClusterRegisterRequest request = SubClusterRegisterRequest.newInstance(subClusterInfo); FederationMembershipStateStoreInputValidator - .validateSubClusterRegisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -242,7 +242,7 @@ public void testValidateSubClusterRegisterRequest() { SubClusterRegisterRequest request = SubClusterRegisterRequest.newInstance(subClusterInfo); FederationMembershipStateStoreInputValidator - .validateSubClusterRegisterRequest(request); + .validate(request); } catch (FederationStateStoreInvalidInputException e) { Assert.fail(e.getMessage()); } @@ -257,7 +257,7 @@ public void testValidateSubClusterRegisterRequest() { SubClusterRegisterRequest request = SubClusterRegisterRequest.newInstance(subClusterInfo); FederationMembershipStateStoreInputValidator - .validateSubClusterRegisterRequest(request); + .validate(request); } catch (FederationStateStoreInvalidInputException e) { Assert.fail(e.getMessage()); } @@ -276,7 +276,7 @@ public void testValidateSubClusterRegisterRequestTimestamp() { SubClusterRegisterRequest request = SubClusterRegisterRequest.newInstance(subClusterInfo); FederationMembershipStateStoreInputValidator - .validateSubClusterRegisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -294,7 +294,7 @@ public void testValidateSubClusterRegisterRequestTimestamp() { SubClusterRegisterRequest request = SubClusterRegisterRequest.newInstance(subClusterInfo); FederationMembershipStateStoreInputValidator - .validateSubClusterRegisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -315,7 +315,7 @@ public void testValidateSubClusterRegisterRequestAddress() { SubClusterRegisterRequest request = SubClusterRegisterRequest.newInstance(subClusterInfo); FederationMembershipStateStoreInputValidator - .validateSubClusterRegisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -332,7 +332,7 @@ public void testValidateSubClusterRegisterRequestAddress() { SubClusterRegisterRequest request = SubClusterRegisterRequest.newInstance(subClusterInfo); FederationMembershipStateStoreInputValidator - .validateSubClusterRegisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -350,7 +350,7 @@ public void testValidateSubClusterRegisterRequestAddress() { SubClusterRegisterRequest request = SubClusterRegisterRequest.newInstance(subClusterInfo); FederationMembershipStateStoreInputValidator - .validateSubClusterRegisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -368,7 +368,7 @@ public void testValidateSubClusterRegisterRequestAddress() { SubClusterRegisterRequest request = SubClusterRegisterRequest.newInstance(subClusterInfo); FederationMembershipStateStoreInputValidator - .validateSubClusterRegisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -386,7 +386,7 @@ public void testValidateSubClusterRegisterRequestAddress() { SubClusterRegisterRequest request = SubClusterRegisterRequest.newInstance(subClusterInfo); FederationMembershipStateStoreInputValidator - .validateSubClusterRegisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -404,7 +404,7 @@ public void testValidateSubClusterRegisterRequestAddress() { SubClusterRegisterRequest request = SubClusterRegisterRequest.newInstance(subClusterInfo); FederationMembershipStateStoreInputValidator - .validateSubClusterRegisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -421,7 +421,7 @@ public void testValidateSubClusterRegisterRequestAddress() { SubClusterRegisterRequest request = SubClusterRegisterRequest.newInstance(subClusterInfo); FederationMembershipStateStoreInputValidator - .validateSubClusterRegisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -438,7 +438,7 @@ public void testValidateSubClusterRegisterRequestAddress() { SubClusterRegisterRequest request = SubClusterRegisterRequest.newInstance(subClusterInfo); FederationMembershipStateStoreInputValidator - .validateSubClusterRegisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -460,7 +460,7 @@ public void testValidateSubClusterRegisterRequestAddressInvalid() { SubClusterRegisterRequest request = SubClusterRegisterRequest.newInstance(subClusterInfo); FederationMembershipStateStoreInputValidator - .validateSubClusterRegisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -477,7 +477,7 @@ public void testValidateSubClusterRegisterRequestAddressInvalid() { SubClusterRegisterRequest request = SubClusterRegisterRequest.newInstance(subClusterInfo); FederationMembershipStateStoreInputValidator - .validateSubClusterRegisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -494,7 +494,7 @@ public void testValidateSubClusterRegisterRequestAddressInvalid() { SubClusterRegisterRequest request = SubClusterRegisterRequest.newInstance(subClusterInfo); FederationMembershipStateStoreInputValidator - .validateSubClusterRegisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -510,7 +510,7 @@ public void testValidateSubClusterRegisterRequestAddressInvalid() { SubClusterRegisterRequest request = SubClusterRegisterRequest.newInstance(subClusterInfo); FederationMembershipStateStoreInputValidator - .validateSubClusterRegisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -526,7 +526,7 @@ public void testValidateSubClusterRegisterRequestAddressInvalid() { SubClusterRegisterRequest request = SubClusterRegisterRequest.newInstance(subClusterInfo); FederationMembershipStateStoreInputValidator - .validateSubClusterRegisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -543,7 +543,7 @@ public void testValidateSubClusterRegisterRequestAddressInvalid() { SubClusterRegisterRequest request = SubClusterRegisterRequest.newInstance(subClusterInfo); FederationMembershipStateStoreInputValidator - .validateSubClusterRegisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -560,7 +560,7 @@ public void testValidateSubClusterRegisterRequestAddressInvalid() { SubClusterRegisterRequest request = SubClusterRegisterRequest.newInstance(subClusterInfo); FederationMembershipStateStoreInputValidator - .validateSubClusterRegisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -576,7 +576,7 @@ public void testValidateSubClusterRegisterRequestAddressInvalid() { SubClusterRegisterRequest request = SubClusterRegisterRequest.newInstance(subClusterInfo); FederationMembershipStateStoreInputValidator - .validateSubClusterRegisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -594,7 +594,7 @@ public void testValidateSubClusterDeregisterRequest() { SubClusterDeregisterRequest request = SubClusterDeregisterRequest.newInstance(subClusterId, stateLost); FederationMembershipStateStoreInputValidator - .validateSubClusterDeregisterRequest(request); + .validate(request); } catch (FederationStateStoreInvalidInputException e) { Assert.fail(e.getMessage()); } @@ -604,7 +604,7 @@ public void testValidateSubClusterDeregisterRequest() { try { SubClusterDeregisterRequest request = null; FederationMembershipStateStoreInputValidator - .validateSubClusterDeregisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -618,7 +618,7 @@ public void testValidateSubClusterDeregisterRequest() { SubClusterDeregisterRequest request = SubClusterDeregisterRequest.newInstance(subClusterIdNull, stateLost); FederationMembershipStateStoreInputValidator - .validateSubClusterDeregisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -632,7 +632,7 @@ public void testValidateSubClusterDeregisterRequest() { SubClusterDeregisterRequest request = SubClusterDeregisterRequest .newInstance(subClusterIdInvalid, stateLost); FederationMembershipStateStoreInputValidator - .validateSubClusterDeregisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -646,7 +646,7 @@ public void testValidateSubClusterDeregisterRequest() { SubClusterDeregisterRequest request = SubClusterDeregisterRequest.newInstance(subClusterId, stateNull); FederationMembershipStateStoreInputValidator - .validateSubClusterDeregisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -660,7 +660,7 @@ public void testValidateSubClusterDeregisterRequest() { SubClusterDeregisterRequest request = SubClusterDeregisterRequest.newInstance(subClusterId, stateNew); FederationMembershipStateStoreInputValidator - .validateSubClusterDeregisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -677,7 +677,7 @@ public void testSubClusterHeartbeatRequest() { SubClusterHeartbeatRequest request = SubClusterHeartbeatRequest .newInstance(subClusterId, lastHeartBeat, stateLost, capability); FederationMembershipStateStoreInputValidator - .validateSubClusterHeartbeatRequest(request); + .validate(request); } catch (FederationStateStoreInvalidInputException e) { Assert.fail(e.getMessage()); } @@ -687,7 +687,7 @@ public void testSubClusterHeartbeatRequest() { try { SubClusterHeartbeatRequest request = null; FederationMembershipStateStoreInputValidator - .validateSubClusterHeartbeatRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -701,7 +701,7 @@ public void testSubClusterHeartbeatRequest() { SubClusterHeartbeatRequest request = SubClusterHeartbeatRequest .newInstance(subClusterIdNull, lastHeartBeat, stateLost, capability); FederationMembershipStateStoreInputValidator - .validateSubClusterHeartbeatRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -716,7 +716,7 @@ public void testSubClusterHeartbeatRequest() { SubClusterHeartbeatRequest.newInstance(subClusterIdInvalid, lastHeartBeat, stateLost, capability); FederationMembershipStateStoreInputValidator - .validateSubClusterHeartbeatRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -730,7 +730,7 @@ public void testSubClusterHeartbeatRequest() { SubClusterHeartbeatRequest request = SubClusterHeartbeatRequest .newInstance(subClusterId, lastHeartBeat, stateNull, capability); FederationMembershipStateStoreInputValidator - .validateSubClusterHeartbeatRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -745,7 +745,7 @@ public void testSubClusterHeartbeatRequest() { SubClusterHeartbeatRequest.newInstance(subClusterId, lastHeartBeatNegative, stateLost, capability); FederationMembershipStateStoreInputValidator - .validateSubClusterHeartbeatRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -759,7 +759,7 @@ public void testSubClusterHeartbeatRequest() { SubClusterHeartbeatRequest request = SubClusterHeartbeatRequest .newInstance(subClusterId, lastHeartBeat, stateLost, capabilityNull); FederationMembershipStateStoreInputValidator - .validateSubClusterHeartbeatRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -773,7 +773,7 @@ public void testSubClusterHeartbeatRequest() { SubClusterHeartbeatRequest request = SubClusterHeartbeatRequest .newInstance(subClusterId, lastHeartBeat, stateLost, capabilityEmpty); FederationMembershipStateStoreInputValidator - .validateSubClusterHeartbeatRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -791,7 +791,7 @@ public void testGetSubClusterInfoRequest() { GetSubClusterInfoRequest request = GetSubClusterInfoRequest.newInstance(subClusterId); FederationMembershipStateStoreInputValidator - .validateGetSubClusterInfoRequest(request); + .validate(request); } catch (FederationStateStoreInvalidInputException e) { Assert.fail(e.getMessage()); } @@ -801,7 +801,7 @@ public void testGetSubClusterInfoRequest() { try { GetSubClusterInfoRequest request = null; FederationMembershipStateStoreInputValidator - .validateGetSubClusterInfoRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -815,7 +815,7 @@ public void testGetSubClusterInfoRequest() { GetSubClusterInfoRequest request = GetSubClusterInfoRequest.newInstance(subClusterIdNull); FederationMembershipStateStoreInputValidator - .validateGetSubClusterInfoRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -829,7 +829,7 @@ public void testGetSubClusterInfoRequest() { GetSubClusterInfoRequest request = GetSubClusterInfoRequest.newInstance(subClusterIdInvalid); FederationMembershipStateStoreInputValidator - .validateGetSubClusterInfoRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -850,7 +850,7 @@ public void testAddApplicationHomeSubClusterRequest() { AddApplicationHomeSubClusterRequest .newInstance(applicationHomeSubCluster); FederationApplicationHomeSubClusterStoreInputValidator - .validateAddApplicationHomeSubClusterRequest(request); + .validate(request); } catch (FederationStateStoreInvalidInputException e) { Assert.fail(e.getMessage()); } @@ -860,7 +860,7 @@ public void testAddApplicationHomeSubClusterRequest() { try { AddApplicationHomeSubClusterRequest request = null; FederationApplicationHomeSubClusterStoreInputValidator - .validateAddApplicationHomeSubClusterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { Assert.assertTrue(e.getMessage() @@ -875,7 +875,7 @@ public void testAddApplicationHomeSubClusterRequest() { AddApplicationHomeSubClusterRequest .newInstance(applicationHomeSubCluster); FederationApplicationHomeSubClusterStoreInputValidator - .validateAddApplicationHomeSubClusterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { Assert.assertTrue( @@ -891,7 +891,7 @@ public void testAddApplicationHomeSubClusterRequest() { AddApplicationHomeSubClusterRequest .newInstance(applicationHomeSubCluster); FederationApplicationHomeSubClusterStoreInputValidator - .validateAddApplicationHomeSubClusterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -908,7 +908,7 @@ public void testAddApplicationHomeSubClusterRequest() { AddApplicationHomeSubClusterRequest .newInstance(applicationHomeSubCluster); FederationApplicationHomeSubClusterStoreInputValidator - .validateAddApplicationHomeSubClusterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -925,7 +925,7 @@ public void testAddApplicationHomeSubClusterRequest() { AddApplicationHomeSubClusterRequest .newInstance(applicationHomeSubCluster); FederationApplicationHomeSubClusterStoreInputValidator - .validateAddApplicationHomeSubClusterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { Assert.assertTrue(e.getMessage().startsWith("Missing Application Id.")); @@ -944,7 +944,7 @@ public void testUpdateApplicationHomeSubClusterRequest() { UpdateApplicationHomeSubClusterRequest .newInstance(applicationHomeSubCluster); FederationApplicationHomeSubClusterStoreInputValidator - .validateUpdateApplicationHomeSubClusterRequest(request); + .validate(request); } catch (FederationStateStoreInvalidInputException e) { Assert.fail(e.getMessage()); } @@ -954,7 +954,7 @@ public void testUpdateApplicationHomeSubClusterRequest() { try { UpdateApplicationHomeSubClusterRequest request = null; FederationApplicationHomeSubClusterStoreInputValidator - .validateUpdateApplicationHomeSubClusterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { Assert.assertTrue(e.getMessage() @@ -969,7 +969,7 @@ public void testUpdateApplicationHomeSubClusterRequest() { UpdateApplicationHomeSubClusterRequest .newInstance(applicationHomeSubCluster); FederationApplicationHomeSubClusterStoreInputValidator - .validateUpdateApplicationHomeSubClusterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { Assert.assertTrue( @@ -985,7 +985,7 @@ public void testUpdateApplicationHomeSubClusterRequest() { UpdateApplicationHomeSubClusterRequest .newInstance(applicationHomeSubCluster); FederationApplicationHomeSubClusterStoreInputValidator - .validateUpdateApplicationHomeSubClusterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -1002,7 +1002,7 @@ public void testUpdateApplicationHomeSubClusterRequest() { UpdateApplicationHomeSubClusterRequest .newInstance(applicationHomeSubCluster); FederationApplicationHomeSubClusterStoreInputValidator - .validateUpdateApplicationHomeSubClusterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -1019,7 +1019,7 @@ public void testUpdateApplicationHomeSubClusterRequest() { UpdateApplicationHomeSubClusterRequest .newInstance(applicationHomeSubCluster); FederationApplicationHomeSubClusterStoreInputValidator - .validateUpdateApplicationHomeSubClusterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { Assert.assertTrue(e.getMessage().startsWith("Missing Application Id.")); @@ -1035,7 +1035,7 @@ public void testGetApplicationHomeSubClusterRequest() { GetApplicationHomeSubClusterRequest request = GetApplicationHomeSubClusterRequest.newInstance(appId); FederationApplicationHomeSubClusterStoreInputValidator - .validateGetApplicationHomeSubClusterRequest(request); + .validate(request); } catch (FederationStateStoreInvalidInputException e) { Assert.fail(e.getMessage()); } @@ -1045,7 +1045,7 @@ public void testGetApplicationHomeSubClusterRequest() { try { GetApplicationHomeSubClusterRequest request = null; FederationApplicationHomeSubClusterStoreInputValidator - .validateGetApplicationHomeSubClusterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { Assert.assertTrue(e.getMessage() @@ -1058,7 +1058,7 @@ public void testGetApplicationHomeSubClusterRequest() { GetApplicationHomeSubClusterRequest request = GetApplicationHomeSubClusterRequest.newInstance(appIdNull); FederationApplicationHomeSubClusterStoreInputValidator - .validateGetApplicationHomeSubClusterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { Assert.assertTrue(e.getMessage().startsWith("Missing Application Id.")); @@ -1075,7 +1075,7 @@ public void testDeleteApplicationHomeSubClusterRequestNull() { DeleteApplicationHomeSubClusterRequest request = DeleteApplicationHomeSubClusterRequest.newInstance(appId); FederationApplicationHomeSubClusterStoreInputValidator - .validateDeleteApplicationHomeSubClusterRequest(request); + .validate(request); } catch (FederationStateStoreInvalidInputException e) { Assert.fail(e.getMessage()); } @@ -1085,7 +1085,7 @@ public void testDeleteApplicationHomeSubClusterRequestNull() { try { DeleteApplicationHomeSubClusterRequest request = null; FederationApplicationHomeSubClusterStoreInputValidator - .validateDeleteApplicationHomeSubClusterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { Assert.assertTrue(e.getMessage() @@ -1098,7 +1098,7 @@ public void testDeleteApplicationHomeSubClusterRequestNull() { DeleteApplicationHomeSubClusterRequest request = DeleteApplicationHomeSubClusterRequest.newInstance(appIdNull); FederationApplicationHomeSubClusterStoreInputValidator - .validateDeleteApplicationHomeSubClusterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { Assert.assertTrue(e.getMessage().startsWith("Missing Application Id.")); @@ -1115,7 +1115,7 @@ public void testGetSubClusterPolicyConfigurationRequest() { GetSubClusterPolicyConfigurationRequest request = GetSubClusterPolicyConfigurationRequest.newInstance(queue); FederationPolicyStoreInputValidator - .validateGetSubClusterPolicyConfigurationRequest(request); + .validate(request); } catch (FederationStateStoreInvalidInputException e) { Assert.fail(e.getMessage()); } @@ -1125,7 +1125,7 @@ public void testGetSubClusterPolicyConfigurationRequest() { try { GetSubClusterPolicyConfigurationRequest request = null; FederationPolicyStoreInputValidator - .validateGetSubClusterPolicyConfigurationRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { Assert.assertTrue(e.getMessage() @@ -1138,7 +1138,7 @@ public void testGetSubClusterPolicyConfigurationRequest() { GetSubClusterPolicyConfigurationRequest request = GetSubClusterPolicyConfigurationRequest.newInstance(queueNull); FederationPolicyStoreInputValidator - .validateGetSubClusterPolicyConfigurationRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { Assert.assertTrue(e.getMessage().startsWith("Missing Queue.")); @@ -1150,7 +1150,7 @@ public void testGetSubClusterPolicyConfigurationRequest() { GetSubClusterPolicyConfigurationRequest request = GetSubClusterPolicyConfigurationRequest.newInstance(queueEmpty); FederationPolicyStoreInputValidator - .validateGetSubClusterPolicyConfigurationRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { Assert.assertTrue(e.getMessage().startsWith("Missing Queue.")); @@ -1169,7 +1169,7 @@ public void testSetSubClusterPolicyConfigurationRequest() { SetSubClusterPolicyConfigurationRequest request = SetSubClusterPolicyConfigurationRequest.newInstance(policy); FederationPolicyStoreInputValidator - .validateSetSubClusterPolicyConfigurationRequest(request); + .validate(request); } catch (FederationStateStoreInvalidInputException e) { Assert.fail(e.getMessage()); } @@ -1179,7 +1179,7 @@ public void testSetSubClusterPolicyConfigurationRequest() { try { SetSubClusterPolicyConfigurationRequest request = null; FederationPolicyStoreInputValidator - .validateSetSubClusterPolicyConfigurationRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { Assert.assertTrue(e.getMessage() @@ -1193,7 +1193,7 @@ public void testSetSubClusterPolicyConfigurationRequest() { SetSubClusterPolicyConfigurationRequest request = SetSubClusterPolicyConfigurationRequest.newInstance(policy); FederationPolicyStoreInputValidator - .validateSetSubClusterPolicyConfigurationRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { Assert.assertTrue( @@ -1208,7 +1208,7 @@ public void testSetSubClusterPolicyConfigurationRequest() { SetSubClusterPolicyConfigurationRequest request = SetSubClusterPolicyConfigurationRequest.newInstance(policy); FederationPolicyStoreInputValidator - .validateSetSubClusterPolicyConfigurationRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { Assert.assertTrue(e.getMessage().startsWith("Missing Queue.")); @@ -1222,7 +1222,7 @@ public void testSetSubClusterPolicyConfigurationRequest() { SetSubClusterPolicyConfigurationRequest request = SetSubClusterPolicyConfigurationRequest.newInstance(policy); FederationPolicyStoreInputValidator - .validateSetSubClusterPolicyConfigurationRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { Assert.assertTrue(e.getMessage().startsWith("Missing Queue.")); @@ -1236,7 +1236,7 @@ public void testSetSubClusterPolicyConfigurationRequest() { SetSubClusterPolicyConfigurationRequest request = SetSubClusterPolicyConfigurationRequest.newInstance(policy); FederationPolicyStoreInputValidator - .validateSetSubClusterPolicyConfigurationRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { Assert.assertTrue(e.getMessage().startsWith("Missing Policy Type.")); @@ -1250,7 +1250,7 @@ public void testSetSubClusterPolicyConfigurationRequest() { SetSubClusterPolicyConfigurationRequest request = SetSubClusterPolicyConfigurationRequest.newInstance(policy); FederationPolicyStoreInputValidator - .validateSetSubClusterPolicyConfigurationRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { Assert.assertTrue(e.getMessage().startsWith("Missing Policy Type.")); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacadeRetry.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacadeRetry.java index 632e865d9a..304910eb36 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacadeRetry.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacadeRetry.java @@ -24,7 +24,6 @@ import org.apache.hadoop.io.retry.RetryPolicy.RetryAction; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreErrorCode; import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreException; import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreInvalidInputException; import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreRetriableException; @@ -82,10 +81,8 @@ public void testFacadeStateStoreException() throws Exception { conf = new Configuration(); conf.setInt(YarnConfiguration.CLIENT_FAILOVER_RETRIES, maxRetries); RetryPolicy policy = FederationStateStoreFacade.createRetryPolicy(conf); - RetryAction action = policy.shouldRetry( - new FederationStateStoreException( - FederationStateStoreErrorCode.APPLICATIONS_INSERT_FAIL), - 0, 0, false); + RetryAction action = policy + .shouldRetry(new FederationStateStoreException("Error"), 0, 0, false); Assert.assertEquals(RetryAction.FAIL.action, action.action); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/resources/FederationStateStore/SQLServer/FederationStateStoreStoreProcs.sql b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/resources/FederationStateStore/SQLServer/FederationStateStoreStoreProcs.sql new file mode 100644 index 0000000000..66d6f0e203 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/resources/FederationStateStore/SQLServer/FederationStateStoreStoreProcs.sql @@ -0,0 +1,511 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +USE [FederationStateStore] +GO + +IF OBJECT_ID ( '[sp_addApplicationHomeSubCluster]', 'P' ) IS NOT NULL + DROP PROCEDURE [sp_addApplicationHomeSubCluster]; +GO + +CREATE PROCEDURE [dbo].[sp_addApplicationHomeSubCluster] + @applicationId VARCHAR(64), + @homeSubCluster VARCHAR(256), + @storedHomeSubCluster VARCHAR(256) OUTPUT, + @rowCount int OUTPUT +AS BEGIN + DECLARE @errorMessage nvarchar(4000) + + BEGIN TRY + BEGIN TRAN + -- If application to sub-cluster map doesn't exist, insert it. + -- Otherwise don't change the current mapping. + IF NOT EXISTS (SELECT TOP 1 * + FROM [dbo].[applicationsHomeSubCluster] + WHERE [applicationId] = @applicationId) + + INSERT INTO [dbo].[applicationsHomeSubCluster] ( + [applicationId], + [homeSubCluster]) + VALUES ( + @applicationId, + @homeSubCluster); + -- End of the IF block + + SELECT @rowCount = @@ROWCOUNT; + + SELECT @storedHomeSubCluster = [homeSubCluster] + FROM [dbo].[applicationsHomeSubCluster] + WHERE [applicationId] = @applicationId; + + COMMIT TRAN + END TRY + + BEGIN CATCH + ROLLBACK TRAN + + SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE()) + + /* raise error and terminate the execution */ + RAISERROR(@errorMessage, --- Error Message + 1, -- Severity + -1 -- State + ) WITH log + END CATCH +END; +GO + +IF OBJECT_ID ( '[sp_updateApplicationHomeSubCluster]', 'P' ) IS NOT NULL + DROP PROCEDURE [sp_updateApplicationHomeSubCluster]; +GO + +CREATE PROCEDURE [dbo].[sp_updateApplicationHomeSubCluster] + @applicationId VARCHAR(64), + @homeSubCluster VARCHAR(256), + @rowCount int OUTPUT +AS BEGIN + DECLARE @errorMessage nvarchar(4000) + + BEGIN TRY + BEGIN TRAN + + UPDATE [dbo].[applicationsHomeSubCluster] + SET [homeSubCluster] = @homeSubCluster + WHERE [applicationId] = @applicationid; + SELECT @rowCount = @@ROWCOUNT; + + COMMIT TRAN + END TRY + + BEGIN CATCH + ROLLBACK TRAN + + SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE()) + + /* raise error and terminate the execution */ + RAISERROR(@errorMessage, --- Error Message + 1, -- Severity + -1 -- State + ) WITH log + END CATCH +END; +GO + +IF OBJECT_ID ( '[sp_getApplicationsHomeSubCluster]', 'P' ) IS NOT NULL + DROP PROCEDURE [sp_getApplicationsHomeSubCluster]; +GO + +CREATE PROCEDURE [dbo].[sp_getApplicationsHomeSubCluster] +AS BEGIN + DECLARE @errorMessage nvarchar(4000) + + BEGIN TRY + SELECT [applicationId], [homeSubCluster], [createTime] + FROM [dbo].[applicationsHomeSubCluster] + END TRY + + BEGIN CATCH + SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE()) + + /* raise error and terminate the execution */ + RAISERROR(@errorMessage, --- Error Message + 1, -- Severity + -1 -- State + ) WITH log + END CATCH +END; +GO + +IF OBJECT_ID ( '[sp_getApplicationHomeSubCluster]', 'P' ) IS NOT NULL + DROP PROCEDURE [sp_getApplicationHomeSubCluster]; +GO + +CREATE PROCEDURE [dbo].[sp_getApplicationHomeSubCluster] + @applicationId VARCHAR(64), + @homeSubCluster VARCHAR(256) OUTPUT +AS BEGIN + DECLARE @errorMessage nvarchar(4000) + + BEGIN TRY + + SELECT @homeSubCluster = [homeSubCluster] + FROM [dbo].[applicationsHomeSubCluster] + WHERE [applicationId] = @applicationid; + + END TRY + + BEGIN CATCH + + SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE()) + + /* raise error and terminate the execution */ + RAISERROR(@errorMessage, --- Error Message + 1, -- Severity + -1 -- State + ) WITH log + END CATCH +END; +GO + +IF OBJECT_ID ( '[sp_deleteApplicationHomeSubCluster]', 'P' ) IS NOT NULL + DROP PROCEDURE [sp_deleteApplicationHomeSubCluster]; +GO + +CREATE PROCEDURE [dbo].[sp_deleteApplicationHomeSubCluster] + @applicationId VARCHAR(64), + @rowCount int OUTPUT +AS BEGIN + DECLARE @errorMessage nvarchar(4000) + + BEGIN TRY + BEGIN TRAN + + DELETE FROM [dbo].[applicationsHomeSubCluster] + WHERE [applicationId] = @applicationId; + SELECT @rowCount = @@ROWCOUNT; + + COMMIT TRAN + END TRY + + BEGIN CATCH + ROLLBACK TRAN + + SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE()) + + /* raise error and terminate the execution */ + RAISERROR(@errorMessage, --- Error Message + 1, -- Severity + -1 -- State + ) WITH log + END CATCH +END; +GO + +IF OBJECT_ID ( '[sp_registerSubCluster]', 'P' ) IS NOT NULL + DROP PROCEDURE [sp_registerSubCluster]; +GO + +CREATE PROCEDURE [dbo].[sp_registerSubCluster] + @subClusterId VARCHAR(256), + @amRMServiceAddress VARCHAR(256), + @clientRMServiceAddress VARCHAR(256), + @rmAdminServiceAddress VARCHAR(256), + @rmWebServiceAddress VARCHAR(256), + @state VARCHAR(32), + @lastStartTime BIGINT, + @capability VARCHAR(6000), + @rowCount int OUTPUT +AS BEGIN + DECLARE @errorMessage nvarchar(4000) + + BEGIN TRY + BEGIN TRAN + + DELETE FROM [dbo].[membership] + WHERE [subClusterId] = @subClusterId; + INSERT INTO [dbo].[membership] ( + [subClusterId], + [amRMServiceAddress], + [clientRMServiceAddress], + [rmAdminServiceAddress], + [rmWebServiceAddress], + [lastHeartBeat], + [state], + [lastStartTime], + [capability] ) + VALUES ( + @subClusterId, + @amRMServiceAddress, + @clientRMServiceAddress, + @rmAdminServiceAddress, + @rmWebServiceAddress, + GETUTCDATE(), + @state, + @lastStartTime, + @capability); + SELECT @rowCount = @@ROWCOUNT; + + COMMIT TRAN + END TRY + + BEGIN CATCH + ROLLBACK TRAN + + SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE()) + + /* raise error and terminate the execution */ + RAISERROR(@errorMessage, --- Error Message + 1, -- Severity + -1 -- State + ) WITH log + END CATCH +END; +GO + +IF OBJECT_ID ( '[sp_getSubClusters]', 'P' ) IS NOT NULL + DROP PROCEDURE [sp_getSubClusters]; +GO + +CREATE PROCEDURE [dbo].[sp_getSubClusters] +AS BEGIN + DECLARE @errorMessage nvarchar(4000) + + BEGIN TRY + SELECT [subClusterId], [amRMServiceAddress], [clientRMServiceAddress], + [rmAdminServiceAddress], [rmWebServiceAddress], [lastHeartBeat], + [state], [lastStartTime], [capability] + FROM [dbo].[membership] + END TRY + + BEGIN CATCH + SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE()) + + /* raise error and terminate the execution */ + RAISERROR(@errorMessage, --- Error Message + 1, -- Severity + -1 -- State + ) WITH log + END CATCH +END; +GO + +IF OBJECT_ID ( '[sp_getSubCluster]', 'P' ) IS NOT NULL + DROP PROCEDURE [sp_getSubCluster]; +GO + +CREATE PROCEDURE [dbo].[sp_getSubCluster] + @subClusterId VARCHAR(256), + @amRMServiceAddress VARCHAR(256) OUTPUT, + @clientRMServiceAddress VARCHAR(256) OUTPUT, + @rmAdminServiceAddress VARCHAR(256) OUTPUT, + @rmWebServiceAddress VARCHAR(256) OUTPUT, + @lastHeartbeat DATETIME2 OUTPUT, + @state VARCHAR(256) OUTPUT, + @lastStartTime BIGINT OUTPUT, + @capability VARCHAR(6000) OUTPUT +AS BEGIN + DECLARE @errorMessage nvarchar(4000) + + BEGIN TRY + BEGIN TRAN + + SELECT @subClusterId = [subClusterId], + @amRMServiceAddress = [amRMServiceAddress], + @clientRMServiceAddress = [clientRMServiceAddress], + @rmAdminServiceAddress = [rmAdminServiceAddress], + @rmWebServiceAddress = [rmWebServiceAddress], + @lastHeartBeat = [lastHeartBeat], + @state = [state], + @lastStartTime = [lastStartTime], + @capability = [capability] + FROM [dbo].[membership] + WHERE [subClusterId] = @subClusterId + + COMMIT TRAN + END TRY + + BEGIN CATCH + ROLLBACK TRAN + + SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE()) + + /* raise error and terminate the execution */ + RAISERROR(@errorMessage, --- Error Message + 1, -- Severity + -1 -- State + ) WITH log + END CATCH +END; +GO + + +IF OBJECT_ID ( '[sp_subClusterHeartbeat]', 'P' ) IS NOT NULL + DROP PROCEDURE [sp_subClusterHeartbeat]; +GO + +CREATE PROCEDURE [dbo].[sp_subClusterHeartbeat] + @subClusterId VARCHAR(256), + @state VARCHAR(256), + @capability VARCHAR(6000), + @rowCount int OUTPUT +AS BEGIN + DECLARE @errorMessage nvarchar(4000) + + BEGIN TRY + BEGIN TRAN + + UPDATE [dbo].[membership] + SET [state] = @state, + [lastHeartbeat] = GETUTCDATE(), + [capability] = @capability + WHERE [subClusterId] = @subClusterId; + SELECT @rowCount = @@ROWCOUNT; + + COMMIT TRAN + END TRY + + BEGIN CATCH + ROLLBACK TRAN + + SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE()) + + /* raise error and terminate the execution */ + RAISERROR(@errorMessage, --- Error Message + 1, -- Severity + -1 -- State + ) WITH log + END CATCH +END; +GO + +IF OBJECT_ID ( '[sp_deregisterSubCluster]', 'P' ) IS NOT NULL + DROP PROCEDURE [sp_deregisterSubCluster]; +GO + +CREATE PROCEDURE [dbo].[sp_deregisterSubCluster] + @subClusterId VARCHAR(256), + @state VARCHAR(256), + @rowCount int OUTPUT +AS BEGIN + DECLARE @errorMessage nvarchar(4000) + + BEGIN TRY + BEGIN TRAN + + UPDATE [dbo].[membership] + SET [state] = @state + WHERE [subClusterId] = @subClusterId; + SELECT @rowCount = @@ROWCOUNT; + + COMMIT TRAN + END TRY + + BEGIN CATCH + ROLLBACK TRAN + + SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE()) + + /* raise error and terminate the execution */ + RAISERROR(@errorMessage, --- Error Message + 1, -- Severity + -1 -- State + ) WITH log + END CATCH +END; +GO + +IF OBJECT_ID ( '[sp_setPolicyConfiguration]', 'P' ) IS NOT NULL + DROP PROCEDURE [sp_setPolicyConfiguration]; +GO + +CREATE PROCEDURE [dbo].[sp_setPolicyConfiguration] + @queue VARCHAR(256), + @policyType VARCHAR(256), + @params VARBINARY(512), + @rowCount int OUTPUT +AS BEGIN + DECLARE @errorMessage nvarchar(4000) + + BEGIN TRY + BEGIN TRAN + + DELETE FROM [dbo].[policies] + WHERE [queue] = @queue; + INSERT INTO [dbo].[policies] ( + [queue], + [policyType], + [params]) + VALUES ( + @queue, + @policyType, + @params); + SELECT @rowCount = @@ROWCOUNT; + + COMMIT TRAN + END TRY + + BEGIN CATCH + ROLLBACK TRAN + + SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE()) + + /* raise error and terminate the execution */ + RAISERROR(@errorMessage, --- Error Message + 1, -- Severity + -1 -- State + ) WITH log + END CATCH +END; +GO + +IF OBJECT_ID ( '[sp_getPolicyConfiguration]', 'P' ) IS NOT NULL + DROP PROCEDURE [sp_getPolicyConfiguration]; +GO + +CREATE PROCEDURE [dbo].[sp_getPolicyConfiguration] + @queue VARCHAR(256), + @policyType VARCHAR(256) OUTPUT, + @params VARBINARY(6000) OUTPUT +AS BEGIN + DECLARE @errorMessage nvarchar(4000) + + BEGIN TRY + + SELECT @policyType = [policyType], + @params = [params] + FROM [dbo].[policies] + WHERE [queue] = @queue + + END TRY + + BEGIN CATCH + + SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE()) + + /* raise error and terminate the execution */ + RAISERROR(@errorMessage, --- Error Message + 1, -- Severity + -1 -- State + ) WITH log + END CATCH +END; +GO + +IF OBJECT_ID ( '[sp_getPoliciesConfigurations]', 'P' ) IS NOT NULL + DROP PROCEDURE [sp_getPoliciesConfigurations]; +GO + +CREATE PROCEDURE [dbo].[sp_getPoliciesConfigurations] +AS BEGIN + DECLARE @errorMessage nvarchar(4000) + + BEGIN TRY + SELECT [queue], [policyType], [params] FROM [dbo].[policies] + END TRY + + BEGIN CATCH + SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE()) + + /* raise error and terminate the execution */ + RAISERROR(@errorMessage, --- Error Message + 1, -- Severity + -1 -- State + ) WITH log + END CATCH +END; +GO \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/resources/FederationStateStore/SQLServer/FederationStateStoreTables.sql b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/resources/FederationStateStore/SQLServer/FederationStateStoreTables.sql new file mode 100644 index 0000000000..a97385b496 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/resources/FederationStateStore/SQLServer/FederationStateStoreTables.sql @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +USE [FederationStateStore] +GO + +IF NOT EXISTS ( SELECT * FROM [FederationStateStore].sys.tables + WHERE name = 'applicationsHomeSubCluster' + AND schema_id = SCHEMA_ID('dbo')) + BEGIN + PRINT 'Table applicationsHomeSubCluster does not exist, create it...' + + SET ANSI_NULLS ON + + SET QUOTED_IDENTIFIER ON + + SET ANSI_PADDING ON + + CREATE TABLE [dbo].[applicationsHomeSubCluster]( + applicationId VARCHAR(64) COLLATE Latin1_General_100_BIN2 NOT NULL, + homeSubCluster VARCHAR(256) NOT NULL, + createTime DATETIME2 NOT NULL CONSTRAINT ts_createAppTime DEFAULT GETUTCDATE(), + + CONSTRAINT [pk_applicationId] PRIMARY KEY + ( + [applicationId] + ) + ) + + SET ANSI_PADDING OFF + + PRINT 'Table applicationsHomeSubCluster created.' + END +ELSE + PRINT 'Table applicationsHomeSubCluster exists, no operation required...' + GO +GO + +IF NOT EXISTS ( SELECT * FROM [FederationStateStore].sys.tables + WHERE name = 'membership' + AND schema_id = SCHEMA_ID('dbo')) + BEGIN + PRINT 'Table membership does not exist, create it...' + + SET ANSI_NULLS ON + + SET QUOTED_IDENTIFIER ON + + SET ANSI_PADDING ON + + CREATE TABLE [dbo].[membership]( + [subClusterId] VARCHAR(256) COLLATE Latin1_General_100_BIN2 NOT NULL, + [amRMServiceAddress] VARCHAR(256) NOT NULL, + [clientRMServiceAddress] VARCHAR(256) NOT NULL, + [rmAdminServiceAddress] VARCHAR(256) NOT NULL, + [rmWebServiceAddress] VARCHAR(256) NOT NULL, + [lastHeartBeat] DATETIME2 NOT NULL, + [state] VARCHAR(32) NOT NULL, + [lastStartTime] BIGINT NOT NULL, + [capability] VARCHAR(6000) NOT NULL, + + CONSTRAINT [pk_subClusterId] PRIMARY KEY + ( + [subClusterId] + ) + ) + + SET ANSI_PADDING OFF + + PRINT 'Table membership created.' + END +ELSE + PRINT 'Table membership exists, no operation required...' + GO +GO + +IF NOT EXISTS ( SELECT * FROM [FederationStateStore].sys.tables + WHERE name = 'policies' + AND schema_id = SCHEMA_ID('dbo')) + BEGIN + PRINT 'Table policies does not exist, create it...' + + SET ANSI_NULLS ON + + SET QUOTED_IDENTIFIER ON + + SET ANSI_PADDING ON + + CREATE TABLE [dbo].[policies]( + queue VARCHAR(256) COLLATE Latin1_General_100_BIN2 NOT NULL, + policyType VARCHAR(256) NOT NULL, + params VARBINARY(6000) NOT NULL, + + CONSTRAINT [pk_queue] PRIMARY KEY + ( + [queue] + ) + ) + + SET ANSI_PADDING OFF + + PRINT 'Table policies created.' + END +ELSE + PRINT 'Table policies exists, no operation required...' + GO +GO