YARN-11548. [Federation] Router Supports Format FederationStateStore. (#6116) Contributed by Shilun Fan.
Reviewed-by: Inigo Goiri <inigoiri@apache.org> Signed-off-by: Shilun Fan <slfan1989@apache.org>
This commit is contained in:
parent
597ceaae3a
commit
72d7b43a32
@ -101,4 +101,11 @@ default void checkVersion() throws Exception {
|
||||
", but loading version " + loadedVersion);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* We will clear the data in stateStore through the deleteStateStore method.
|
||||
*
|
||||
* @throws Exception an exception occurred in delete store.
|
||||
*/
|
||||
void deleteStateStore() throws Exception;
|
||||
}
|
||||
|
@ -419,6 +419,16 @@ public void storeVersion() throws Exception {
|
||||
version = ((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteStateStore() throws Exception {
|
||||
membership.clear();
|
||||
applications.clear();
|
||||
reservations.clear();
|
||||
policies.clear();
|
||||
sequenceNum = new AtomicInteger();
|
||||
masterKeyId = new AtomicInteger();
|
||||
}
|
||||
|
||||
@Override
|
||||
public AddReservationHomeSubClusterResponse addReservationHomeSubCluster(
|
||||
AddReservationHomeSubClusterRequest request) throws YarnException {
|
||||
|
@ -29,6 +29,7 @@
|
||||
import java.sql.Timestamp;
|
||||
import java.sql.Types;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Calendar;
|
||||
import java.util.List;
|
||||
import java.util.TimeZone;
|
||||
@ -216,6 +217,10 @@ public class SQLFederationStateStore implements FederationStateStore {
|
||||
private static final String CALL_SP_LOAD_VERSION =
|
||||
"{call sp_getVersion(?, ?)}";
|
||||
|
||||
private static final List<String> TABLES = new ArrayList<>(
|
||||
Arrays.asList("applicationsHomeSubCluster", "membership", "policies", "versions",
|
||||
"reservationsHomeSubCluster", "masterKeys", "delegationTokens", "sequenceTable"));
|
||||
|
||||
private Calendar utcCalendar =
|
||||
Calendar.getInstance(TimeZone.getTimeZone("UTC"));
|
||||
|
||||
@ -1122,6 +1127,11 @@ public void storeVersion() throws Exception {
|
||||
storeVersion(fedVersion, versionComment);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteStateStore() throws Exception {
|
||||
truncateTable();
|
||||
}
|
||||
|
||||
/**
|
||||
* Store the Federation Version in the database.
|
||||
*
|
||||
@ -2077,6 +2087,32 @@ private int querySequenceTable(String sequenceName, boolean isUpdate){
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* We will truncate the tables, iterate through each table individually,
|
||||
* and then clean the tables.
|
||||
*/
|
||||
private void truncateTable() {
|
||||
Connection connection = null;
|
||||
try {
|
||||
connection = getConnection(false);
|
||||
FederationQueryRunner runner = new FederationQueryRunner();
|
||||
for (String table : TABLES) {
|
||||
LOG.info("truncate table = {} start.", table);
|
||||
runner.truncateTable(connection, table);
|
||||
LOG.info("truncate table = {} finished.", table);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("Could not truncate table!", e);
|
||||
} finally {
|
||||
// Return to the pool the CallableStatement
|
||||
try {
|
||||
FederationStateStoreUtils.returnToPool(LOG, null, connection);
|
||||
} catch (YarnException e) {
|
||||
LOG.error("close connection error.", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public HikariDataSource getDataSource() {
|
||||
return dataSource;
|
||||
|
@ -226,6 +226,8 @@ public class ZookeeperFederationStateStore implements FederationStateStore {
|
||||
private ZKFederationStateStoreOpDurations opDurations =
|
||||
ZKFederationStateStoreOpDurations.getInstance();
|
||||
|
||||
private Configuration configuration;
|
||||
|
||||
/*
|
||||
* Indicates different app attempt state store operations.
|
||||
*/
|
||||
@ -251,7 +253,7 @@ private final static class AppNodeSplitInfo {
|
||||
public void init(Configuration conf) throws YarnException {
|
||||
|
||||
LOG.info("Initializing ZooKeeper connection");
|
||||
|
||||
this.configuration = conf;
|
||||
maxAppsInStateStore = conf.getInt(
|
||||
YarnConfiguration.FEDERATION_STATESTORE_MAX_APPLICATIONS,
|
||||
YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_MAX_APPLICATIONS);
|
||||
@ -273,13 +275,8 @@ public void init(Configuration conf) throws YarnException {
|
||||
reservationsZNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_RESERVATION);
|
||||
versionNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_VERSION);
|
||||
|
||||
String hierarchiesPath = getNodePath(appsZNode, ROUTER_APP_ROOT_HIERARCHIES);
|
||||
routerAppRootHierarchies = new HashMap<>();
|
||||
routerAppRootHierarchies.put(0, appsZNode);
|
||||
for (int splitIndex = 1; splitIndex <= HIERARCHIES_LEVEL; splitIndex++) {
|
||||
routerAppRootHierarchies.put(splitIndex,
|
||||
getNodePath(hierarchiesPath, Integer.toString(splitIndex)));
|
||||
}
|
||||
// Initialize hierarchical path
|
||||
initHierarchiesPath();
|
||||
|
||||
appIdNodeSplitIndex = conf.getInt(YarnConfiguration.ZK_APPID_NODE_SPLIT_INDEX,
|
||||
YarnConfiguration.DEFAULT_ZK_APPID_NODE_SPLIT_INDEX);
|
||||
@ -302,26 +299,7 @@ public void init(Configuration conf) throws YarnException {
|
||||
ROUTER_RM_DT_MASTER_KEY_ID_ZNODE_NAME);
|
||||
|
||||
// Create base znode for each entity
|
||||
try {
|
||||
List<ACL> zkAcl = ZKCuratorManager.getZKAcls(conf);
|
||||
zkManager.createRootDirRecursively(membershipZNode, zkAcl);
|
||||
zkManager.createRootDirRecursively(appsZNode, zkAcl);
|
||||
zkManager.createRootDirRecursively(
|
||||
getNodePath(appsZNode, ROUTER_APP_ROOT_HIERARCHIES));
|
||||
for (int splitIndex = 1; splitIndex <= HIERARCHIES_LEVEL; splitIndex++) {
|
||||
zkManager.createRootDirRecursively(
|
||||
routerAppRootHierarchies.get(splitIndex));
|
||||
}
|
||||
zkManager.createRootDirRecursively(policiesZNode, zkAcl);
|
||||
zkManager.createRootDirRecursively(reservationsZNode, zkAcl);
|
||||
zkManager.createRootDirRecursively(routerRMDTSecretManagerRoot, zkAcl);
|
||||
zkManager.createRootDirRecursively(routerRMDTMasterKeysRootPath, zkAcl);
|
||||
zkManager.createRootDirRecursively(routerRMDelegationTokensRootPath, zkAcl);
|
||||
zkManager.createRootDirRecursively(versionNode, zkAcl);
|
||||
} catch (Exception e) {
|
||||
String errMsg = "Cannot create base directories: " + e.getMessage();
|
||||
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
|
||||
}
|
||||
createBaseZNodeForEachEntity();
|
||||
|
||||
// Distributed sequenceNum.
|
||||
try {
|
||||
@ -831,6 +809,60 @@ public void storeVersion() throws Exception {
|
||||
put(versionNode, data, isUpdate);
|
||||
}
|
||||
|
||||
private void initHierarchiesPath() {
|
||||
String hierarchiesPath = getNodePath(appsZNode, ROUTER_APP_ROOT_HIERARCHIES);
|
||||
routerAppRootHierarchies = new HashMap<>();
|
||||
routerAppRootHierarchies.put(0, appsZNode);
|
||||
for (int splitIndex = 1; splitIndex <= HIERARCHIES_LEVEL; splitIndex++) {
|
||||
routerAppRootHierarchies.put(splitIndex,
|
||||
getNodePath(hierarchiesPath, Integer.toString(splitIndex)));
|
||||
}
|
||||
}
|
||||
|
||||
private void createBaseZNodeForEachEntity() throws YarnException {
|
||||
try {
|
||||
List<ACL> zkAcl = ZKCuratorManager.getZKAcls(configuration);
|
||||
zkManager.createRootDirRecursively(membershipZNode, zkAcl);
|
||||
zkManager.createRootDirRecursively(appsZNode, zkAcl);
|
||||
zkManager.createRootDirRecursively(
|
||||
getNodePath(appsZNode, ROUTER_APP_ROOT_HIERARCHIES));
|
||||
for (int splitIndex = 1; splitIndex <= HIERARCHIES_LEVEL; splitIndex++) {
|
||||
zkManager.createRootDirRecursively(
|
||||
routerAppRootHierarchies.get(splitIndex));
|
||||
}
|
||||
zkManager.createRootDirRecursively(policiesZNode, zkAcl);
|
||||
zkManager.createRootDirRecursively(reservationsZNode, zkAcl);
|
||||
zkManager.createRootDirRecursively(routerRMDTSecretManagerRoot, zkAcl);
|
||||
zkManager.createRootDirRecursively(routerRMDTMasterKeysRootPath, zkAcl);
|
||||
zkManager.createRootDirRecursively(routerRMDelegationTokensRootPath, zkAcl);
|
||||
zkManager.createRootDirRecursively(versionNode, zkAcl);
|
||||
} catch (Exception e) {
|
||||
String errMsg = "Cannot create base directories: " + e.getMessage();
|
||||
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteStateStore() throws Exception {
|
||||
|
||||
// Cleaning ZNodes and their child nodes;
|
||||
// after the cleaning is complete, the ZNodes will no longer exist.
|
||||
zkManager.delete(appsZNode);
|
||||
zkManager.delete(membershipZNode);
|
||||
zkManager.delete(policiesZNode);
|
||||
zkManager.delete(reservationsZNode);
|
||||
zkManager.delete(routerRMDTSecretManagerRoot);
|
||||
zkManager.delete(routerRMDTMasterKeysRootPath);
|
||||
zkManager.delete(routerRMDelegationTokensRootPath);
|
||||
zkManager.delete(versionNode);
|
||||
|
||||
// Initialize hierarchical path
|
||||
initHierarchiesPath();
|
||||
|
||||
// We will continue to create ZNodes to ensure that the base path exists.
|
||||
createBaseZNodeForEachEntity();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the subcluster for an application.
|
||||
*
|
||||
|
@ -294,6 +294,39 @@ public void updateSequenceTable(Connection connection, String sequenceName, int
|
||||
}
|
||||
}
|
||||
|
||||
public void truncateTable(Connection connection, String tableName)
|
||||
throws SQLException {
|
||||
DbType dbType = DatabaseProduct.getDbType(connection);
|
||||
String deleteSQL = getTruncateStatement(dbType, tableName);
|
||||
boolean committed = false;
|
||||
Statement statement = null;
|
||||
try {
|
||||
statement = connection.createStatement();
|
||||
statement.execute(deleteSQL);
|
||||
connection.commit();
|
||||
committed = true;
|
||||
} catch (SQLException e) {
|
||||
throw new SQLException("Unable to truncateTable due to: " + e.getMessage());
|
||||
} finally {
|
||||
if (!committed) {
|
||||
rollbackDBConn(connection);
|
||||
}
|
||||
close(statement);
|
||||
}
|
||||
}
|
||||
|
||||
private String getTruncateStatement(DbType dbType, String tableName) {
|
||||
if (isMYSQL(dbType)) {
|
||||
return ("DELETE FROM \"" + tableName + "\"");
|
||||
} else {
|
||||
return("DELETE FROM " + tableName);
|
||||
}
|
||||
}
|
||||
|
||||
private boolean isMYSQL(DbType dbType) {
|
||||
return dbType == DbType.MYSQL;
|
||||
}
|
||||
|
||||
static void rollbackDBConn(Connection dbConn) {
|
||||
try {
|
||||
if (dbConn != null && !dbConn.isClosed()) {
|
||||
|
@ -1118,4 +1118,8 @@ public ApplicationSubmissionContext getApplicationSubmissionContext(ApplicationI
|
||||
public FederationCache getFederationCache() {
|
||||
return federationCache;
|
||||
}
|
||||
|
||||
public void deleteStore() throws Exception {
|
||||
stateStore.deleteStateStore();
|
||||
}
|
||||
}
|
||||
|
@ -91,6 +91,7 @@
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
|
||||
/**
|
||||
* Base class for FederationMembershipStateStore implementations.
|
||||
@ -120,6 +121,7 @@ public void before() throws IOException, YarnException {
|
||||
|
||||
@After
|
||||
public void after() throws Exception {
|
||||
testDeleteStateStore();
|
||||
stateStore.close();
|
||||
}
|
||||
|
||||
@ -1112,4 +1114,26 @@ public void testGetApplicationHomeSubClusterWithContext() throws Exception {
|
||||
assertEquals(subClusterId, applicationHomeSubCluster.getHomeSubCluster());
|
||||
assertEquals(context, applicationHomeSubCluster.getApplicationSubmissionContext());
|
||||
}
|
||||
|
||||
public void testDeleteStateStore() throws Exception {
|
||||
// Step1. We clean the StateStore.
|
||||
FederationStateStore federationStateStore = this.getStateStore();
|
||||
federationStateStore.deleteStateStore();
|
||||
|
||||
// Step2. When we query the sub-cluster information, it should not exist.
|
||||
GetSubClustersInfoRequest request = GetSubClustersInfoRequest.newInstance(true);
|
||||
List<SubClusterInfo> subClustersActive = stateStore.getSubClusters(request).getSubClusters();
|
||||
assertNotNull(subClustersActive);
|
||||
assertEquals(0, subClustersActive.size());
|
||||
|
||||
// Step3. When we query the applications' information, it should not exist.
|
||||
GetApplicationsHomeSubClusterRequest getRequest =
|
||||
GetApplicationsHomeSubClusterRequest.newInstance();
|
||||
GetApplicationsHomeSubClusterResponse result =
|
||||
stateStore.getApplicationsHomeSubCluster(getRequest);
|
||||
assertNotNull(result);
|
||||
List<ApplicationHomeSubCluster> appsHomeSubClusters = result.getAppsHomeSubClusters();
|
||||
assertNotNull(appsHomeSubClusters);
|
||||
assertEquals(0, appsHomeSubClusters.size());
|
||||
}
|
||||
}
|
||||
|
@ -106,6 +106,7 @@ protected FederationStateStore createStateStore() {
|
||||
conf.set(YarnConfiguration.FEDERATION_STATESTORE_SQL_URL,
|
||||
DATABASE_URL + System.currentTimeMillis());
|
||||
conf.setInt(YarnConfiguration.FEDERATION_STATESTORE_MAX_APPLICATIONS, 10);
|
||||
conf.setInt(YarnConfiguration.FEDERATION_STATESTORE_SQL_MAXCONNECTIONS, 10);
|
||||
super.setConf(conf);
|
||||
sqlFederationStateStore = new HSQLDBFederationStateStore();
|
||||
return sqlFederationStateStore;
|
||||
@ -647,6 +648,6 @@ public void testCheckHikariDataSourceParam() throws SQLException {
|
||||
assertEquals(10000, connTimeOut);
|
||||
assertEquals("YARN-Federation-DataBasePool", poolName);
|
||||
assertEquals(1, minimumIdle);
|
||||
assertEquals(1, maximumPoolSize);
|
||||
assertEquals(10, maximumPoolSize);
|
||||
}
|
||||
}
|
@ -287,6 +287,11 @@ public void checkVersion() throws Exception {
|
||||
stateStoreClient.checkVersion();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteStateStore() throws Exception {
|
||||
stateStoreClient.deleteStateStore();
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetSubClusterPolicyConfigurationResponse getPolicyConfiguration(
|
||||
GetSubClusterPolicyConfigurationRequest request) throws YarnException {
|
||||
|
@ -373,9 +373,15 @@ public static void removeApplication(Configuration conf, String applicationId)
|
||||
LOG.info("Application is deleted from state store");
|
||||
}
|
||||
|
||||
private static void handFormatStateStore() {
|
||||
// TODO: YARN-11548. [Federation] Router Supports Format FederationStateStore.
|
||||
System.err.println("format-state-store is not yet supported.");
|
||||
private static void handFormatStateStore(Configuration conf) {
|
||||
try {
|
||||
System.out.println("Deleting Federation state store.");
|
||||
FederationStateStoreFacade facade = FederationStateStoreFacade.getInstance(conf);
|
||||
System.out.println("Federation state store has been cleaned.");
|
||||
facade.deleteStore();
|
||||
} catch (Exception e) {
|
||||
System.err.println("Delete Federation state store error, exception = " + e);
|
||||
}
|
||||
}
|
||||
|
||||
private static void handRemoveApplicationFromStateStore(Configuration conf,
|
||||
@ -409,7 +415,7 @@ private static void executeRouterCommand(Configuration conf, String[] args) {
|
||||
CommandLine cliParser = new DefaultParser().parse(opts, args);
|
||||
|
||||
if (CMD_FORMAT_STATE_STORE.equals(cmd)) {
|
||||
handFormatStateStore();
|
||||
handFormatStateStore(conf);
|
||||
} else if (CMD_REMOVE_APPLICATION_FROM_STATE_STORE.equals(cmd)) {
|
||||
if (cliParser.hasOption(removeApplicationFromStateStoreOpt)) {
|
||||
String applicationId = cliParser.getOptionValue(removeApplicationFromStateStoreOpt);
|
||||
|
Loading…
Reference in New Issue
Block a user