YARN-11290. Improve Query Condition of FederationStateStore#getApplicationsHomeSubCluster. (#4846)

This commit is contained in:
slfan1989 2022-09-28 04:28:52 +08:00 committed by GitHub
parent 735e35d648
commit bfd6415827
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 377 additions and 38 deletions

View File

@ -122,10 +122,21 @@ BEGIN
WHERE applicationId = applicationID_IN; WHERE applicationId = applicationID_IN;
END // END //
CREATE PROCEDURE sp_getApplicationsHomeSubCluster() CREATE PROCEDURE sp_getApplicationsHomeSubCluster(IN limit_IN int, IN homeSubCluster_IN varchar(256))
BEGIN BEGIN
SELECT applicationId, homeSubCluster SELECT
FROM applicationsHomeSubCluster; applicationId,
homeSubCluster,
createTime
FROM (SELECT
applicationId,
homeSubCluster,
createTime,
@rownum := 0
FROM applicationshomesubcluster
ORDER BY createTime DESC) AS applicationshomesubcluster
WHERE (homeSubCluster_IN = '' OR homeSubCluster = homeSubCluster_IN)
AND (@rownum := @rownum + 1) <= limit_IN;
END // END //
CREATE PROCEDURE sp_deleteApplicationHomeSubCluster( CREATE PROCEDURE sp_deleteApplicationHomeSubCluster(

View File

@ -22,7 +22,8 @@ USE FederationStateStore
CREATE TABLE applicationsHomeSubCluster( CREATE TABLE applicationsHomeSubCluster(
applicationId varchar(64) NOT NULL, applicationId varchar(64) NOT NULL,
homeSubCluster varchar(256) NULL, homeSubCluster varchar(256) NOT NULL,
createTime datetime NOT NULL,
CONSTRAINT pk_applicationId PRIMARY KEY (applicationId) CONSTRAINT pk_applicationId PRIMARY KEY (applicationId)
); );

View File

@ -111,12 +111,26 @@ IF OBJECT_ID ( '[sp_getApplicationsHomeSubCluster]', 'P' ) IS NOT NULL
GO GO
CREATE PROCEDURE [dbo].[sp_getApplicationsHomeSubCluster] CREATE PROCEDURE [dbo].[sp_getApplicationsHomeSubCluster]
@limit int,
@homeSubCluster VARCHAR(256)
AS BEGIN AS BEGIN
DECLARE @errorMessage nvarchar(4000) DECLARE @errorMessage nvarchar(4000)
BEGIN TRY BEGIN TRY
SELECT [applicationId], [homeSubCluster], [createTime]
FROM [dbo].[applicationsHomeSubCluster] SELECT
[applicationId],
[homeSubCluster],
[createTime]
FROM(SELECT
[applicationId],
[homeSubCluster],
[createTime],
row_number() over(order by [createTime] desc) AS app_rank
FROM [dbo].[applicationsHomeSubCluster]
WHERE [homeSubCluster] = @homeSubCluster OR @homeSubCluster = '') AS applicationsHomeSubCluster
WHERE app_rank <= @limit;
END TRY END TRY
BEGIN CATCH BEGIN CATCH

View File

@ -4056,6 +4056,11 @@ public static boolean isAclEnabled(Configuration conf) {
public static final int DEFAULT_FEDERATION_STATESTORE_SQL_MAXCONNECTIONS = 1; public static final int DEFAULT_FEDERATION_STATESTORE_SQL_MAXCONNECTIONS = 1;
public static final String FEDERATION_STATESTORE_MAX_APPLICATIONS =
FEDERATION_PREFIX + "state-store.max-applications";
public static final int DEFAULT_FEDERATION_STATESTORE_MAX_APPLICATIONS = 1000;
public static final String ROUTER_PREFIX = YARN_PREFIX + "router."; public static final String ROUTER_PREFIX = YARN_PREFIX + "router.";
public static final String ROUTER_BIND_HOST = ROUTER_PREFIX + "bind-host"; public static final String ROUTER_BIND_HOST = ROUTER_PREFIX + "bind-host";

View File

@ -5007,4 +5007,13 @@
</description> </description>
</property> </property>
<property>
<name>yarn.federation.state-store.max-applications</name>
<value>1000</value>
<description>
Yarn federation state-store supports querying the maximum number of apps.
Default is 1000.
</description>
</property>
</configuration> </configuration>

View File

@ -27,12 +27,16 @@
import java.util.Set; import java.util.Set;
import java.util.TimeZone; import java.util.TimeZone;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.Comparator;
import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest; import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
@ -90,6 +94,8 @@
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import static org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreUtils.filterHomeSubCluster;
/** /**
* In-memory implementation of {@link FederationStateStore}. * In-memory implementation of {@link FederationStateStore}.
*/ */
@ -100,6 +106,7 @@ public class MemoryFederationStateStore implements FederationStateStore {
private Map<ReservationId, SubClusterId> reservations; private Map<ReservationId, SubClusterId> reservations;
private Map<String, SubClusterPolicyConfiguration> policies; private Map<String, SubClusterPolicyConfiguration> policies;
private RouterRMDTSecretManagerState routerRMSecretManagerState; private RouterRMDTSecretManagerState routerRMSecretManagerState;
private int maxAppsInStateStore;
private final MonotonicClock clock = new MonotonicClock(); private final MonotonicClock clock = new MonotonicClock();
@ -113,6 +120,9 @@ public void init(Configuration conf) {
reservations = new ConcurrentHashMap<ReservationId, SubClusterId>(); reservations = new ConcurrentHashMap<ReservationId, SubClusterId>();
policies = new ConcurrentHashMap<String, SubClusterPolicyConfiguration>(); policies = new ConcurrentHashMap<String, SubClusterPolicyConfiguration>();
routerRMSecretManagerState = new RouterRMDTSecretManagerState(); routerRMSecretManagerState = new RouterRMDTSecretManagerState();
maxAppsInStateStore = conf.getInt(
YarnConfiguration.FEDERATION_STATESTORE_MAX_APPLICATIONS,
YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_MAX_APPLICATIONS);
} }
@Override @Override
@ -266,17 +276,28 @@ public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster(
@Override @Override
public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster( public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster(
GetApplicationsHomeSubClusterRequest request) throws YarnException { GetApplicationsHomeSubClusterRequest request) throws YarnException {
List<ApplicationHomeSubCluster> result =
new ArrayList<ApplicationHomeSubCluster>(); if (request == null) {
for (Entry<ApplicationId, SubClusterId> e : applications.entrySet()) { throw new YarnException("Missing getApplicationsHomeSubCluster request");
result
.add(ApplicationHomeSubCluster.newInstance(e.getKey(), e.getValue()));
} }
GetApplicationsHomeSubClusterResponse.newInstance(result); SubClusterId requestSC = request.getSubClusterId();
List<ApplicationHomeSubCluster> result = applications.keySet().stream()
.map(applicationId -> generateAppHomeSC(applicationId))
.sorted(Comparator.comparing(ApplicationHomeSubCluster::getCreateTime).reversed())
.filter(appHomeSC -> filterHomeSubCluster(requestSC, appHomeSC.getHomeSubCluster()))
.limit(maxAppsInStateStore)
.collect(Collectors.toList());
LOG.info("filterSubClusterId = {}, appCount = {}.", requestSC, result.size());
return GetApplicationsHomeSubClusterResponse.newInstance(result); return GetApplicationsHomeSubClusterResponse.newInstance(result);
} }
private ApplicationHomeSubCluster generateAppHomeSC(ApplicationId applicationId) {
SubClusterId subClusterId = applications.get(applicationId);
return ApplicationHomeSubCluster.newInstance(applicationId, Time.now(), subClusterId);
}
@Override @Override
public DeleteApplicationHomeSubClusterResponse deleteApplicationHomeSubCluster( public DeleteApplicationHomeSubClusterResponse deleteApplicationHomeSubCluster(
DeleteApplicationHomeSubClusterRequest request) throws YarnException { DeleteApplicationHomeSubClusterRequest request) throws YarnException {

View File

@ -136,7 +136,7 @@ public class SQLFederationStateStore implements FederationStateStore {
"{call sp_getApplicationHomeSubCluster(?, ?)}"; "{call sp_getApplicationHomeSubCluster(?, ?)}";
private static final String CALL_SP_GET_APPLICATIONS_HOME_SUBCLUSTER = private static final String CALL_SP_GET_APPLICATIONS_HOME_SUBCLUSTER =
"{call sp_getApplicationsHomeSubCluster()}"; "{call sp_getApplicationsHomeSubCluster(?, ?)}";
private static final String CALL_SP_SET_POLICY_CONFIGURATION = private static final String CALL_SP_SET_POLICY_CONFIGURATION =
"{call sp_setPolicyConfiguration(?, ?, ?, ?)}"; "{call sp_setPolicyConfiguration(?, ?, ?, ?)}";
@ -176,6 +176,7 @@ public class SQLFederationStateStore implements FederationStateStore {
private final Clock clock = new MonotonicClock(); private final Clock clock = new MonotonicClock();
@VisibleForTesting @VisibleForTesting
Connection conn = null; Connection conn = null;
private int maxAppsInStateStore;
@Override @Override
public void init(Configuration conf) throws YarnException { public void init(Configuration conf) throws YarnException {
@ -215,6 +216,10 @@ public void init(Configuration conf) throws YarnException {
FederationStateStoreUtils.logAndThrowRetriableException(LOG, FederationStateStoreUtils.logAndThrowRetriableException(LOG,
"Not able to get Connection", e); "Not able to get Connection", e);
} }
maxAppsInStateStore = conf.getInt(
YarnConfiguration.FEDERATION_STATESTORE_MAX_APPLICATIONS,
YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_MAX_APPLICATIONS);
} }
@Override @Override
@ -748,24 +753,35 @@ public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster(
@Override @Override
public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster( public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster(
GetApplicationsHomeSubClusterRequest request) throws YarnException { GetApplicationsHomeSubClusterRequest request) throws YarnException {
if (request == null) {
throw new YarnException("Missing getApplicationsHomeSubCluster request");
}
CallableStatement cstmt = null; CallableStatement cstmt = null;
ResultSet rs = null; ResultSet rs = null;
List<ApplicationHomeSubCluster> appsHomeSubClusters = List<ApplicationHomeSubCluster> appsHomeSubClusters = new ArrayList<>();
new ArrayList<ApplicationHomeSubCluster>();
try { try {
cstmt = getCallableStatement(CALL_SP_GET_APPLICATIONS_HOME_SUBCLUSTER); cstmt = getCallableStatement(CALL_SP_GET_APPLICATIONS_HOME_SUBCLUSTER);
cstmt.setInt("limit_IN", maxAppsInStateStore);
String homeSubClusterIN = StringUtils.EMPTY;
SubClusterId subClusterId = request.getSubClusterId();
if (subClusterId != null) {
homeSubClusterIN = subClusterId.toString();
}
cstmt.setString("homeSubCluster_IN", homeSubClusterIN);
// Execute the query // Execute the query
long startTime = clock.getTime(); long startTime = clock.getTime();
rs = cstmt.executeQuery(); rs = cstmt.executeQuery();
long stopTime = clock.getTime(); long stopTime = clock.getTime();
while (rs.next()) { while (rs.next() && appsHomeSubClusters.size() <= maxAppsInStateStore) {
// Extract the output for each tuple // Extract the output for each tuple
String applicationId = rs.getString(1); String applicationId = rs.getString("applicationId");
String homeSubCluster = rs.getString(2); String homeSubCluster = rs.getString("homeSubCluster");
appsHomeSubClusters.add(ApplicationHomeSubCluster.newInstance( appsHomeSubClusters.add(ApplicationHomeSubCluster.newInstance(
ApplicationId.fromString(applicationId), ApplicationId.fromString(applicationId),
@ -783,8 +799,8 @@ public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster(
// Return to the pool the CallableStatement // Return to the pool the CallableStatement
FederationStateStoreUtils.returnToPool(LOG, cstmt, null, rs); FederationStateStoreUtils.returnToPool(LOG, cstmt, null, rs);
} }
return GetApplicationsHomeSubClusterResponse
.newInstance(appsHomeSubClusters); return GetApplicationsHomeSubClusterResponse.newInstance(appsHomeSubClusters);
} }
@Override @Override

View File

@ -24,10 +24,13 @@
import java.util.Calendar; import java.util.Calendar;
import java.util.List; import java.util.List;
import java.util.TimeZone; import java.util.TimeZone;
import java.util.Comparator;
import java.util.stream.Collectors;
import org.apache.commons.lang3.NotImplementedException; import org.apache.commons.lang3.NotImplementedException;
import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.curator.ZKCuratorManager; import org.apache.hadoop.util.curator.ZKCuratorManager;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -98,6 +101,8 @@
import org.apache.hadoop.thirdparty.protobuf.InvalidProtocolBufferException; import org.apache.hadoop.thirdparty.protobuf.InvalidProtocolBufferException;
import static org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreUtils.filterHomeSubCluster;
/** /**
* ZooKeeper implementation of {@link FederationStateStore}. * ZooKeeper implementation of {@link FederationStateStore}.
* *
@ -136,6 +141,7 @@ public class ZookeeperFederationStateStore implements FederationStateStore {
private String membershipZNode; private String membershipZNode;
private String policiesZNode; private String policiesZNode;
private String reservationsZNode; private String reservationsZNode;
private int maxAppsInStateStore;
private volatile Clock clock = SystemClock.getInstance(); private volatile Clock clock = SystemClock.getInstance();
@ -147,6 +153,10 @@ public class ZookeeperFederationStateStore implements FederationStateStore {
public void init(Configuration conf) throws YarnException { public void init(Configuration conf) throws YarnException {
LOG.info("Initializing ZooKeeper connection"); LOG.info("Initializing ZooKeeper connection");
maxAppsInStateStore = conf.getInt(
YarnConfiguration.FEDERATION_STATESTORE_MAX_APPLICATIONS,
YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_MAX_APPLICATIONS);
baseZNode = conf.get( baseZNode = conf.get(
YarnConfiguration.FEDERATION_STATESTORE_ZK_PARENT_PATH, YarnConfiguration.FEDERATION_STATESTORE_ZK_PARENT_PATH,
YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_ZK_PARENT_PATH); YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_ZK_PARENT_PATH);
@ -258,24 +268,44 @@ public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster(
@Override @Override
public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster( public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster(
GetApplicationsHomeSubClusterRequest request) throws YarnException { GetApplicationsHomeSubClusterRequest request) throws YarnException {
long start = clock.getTime();
List<ApplicationHomeSubCluster> result = new ArrayList<>(); if (request == null) {
throw new YarnException("Missing getApplicationsHomeSubCluster request");
}
try { try {
for (String child : zkManager.getChildren(appsZNode)) { long start = clock.getTime();
ApplicationId appId = ApplicationId.fromString(child); SubClusterId requestSC = request.getSubClusterId();
SubClusterId homeSubCluster = getApp(appId); List<String> children = zkManager.getChildren(appsZNode);
ApplicationHomeSubCluster app = List<ApplicationHomeSubCluster> result = children.stream()
ApplicationHomeSubCluster.newInstance(appId, homeSubCluster); .map(child -> generateAppHomeSC(child))
result.add(app); .sorted(Comparator.comparing(ApplicationHomeSubCluster::getCreateTime).reversed())
} .filter(appHomeSC -> filterHomeSubCluster(requestSC, appHomeSC.getHomeSubCluster()))
.limit(maxAppsInStateStore)
.collect(Collectors.toList());
long end = clock.getTime();
opDurations.addGetAppsHomeSubClusterDuration(start, end);
LOG.info("filterSubClusterId = {}, appCount = {}.", requestSC, result.size());
return GetApplicationsHomeSubClusterResponse.newInstance(result);
} catch (Exception e) { } catch (Exception e) {
String errMsg = "Cannot get apps: " + e.getMessage(); String errMsg = "Cannot get apps: " + e.getMessage();
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
} }
long end = clock.getTime();
opDurations.addGetAppsHomeSubClusterDuration(start, end); throw new YarnException("Cannot get app by request");
return GetApplicationsHomeSubClusterResponse.newInstance(result); }
private ApplicationHomeSubCluster generateAppHomeSC(String appId) {
try {
ApplicationId applicationId = ApplicationId.fromString(appId);
SubClusterId homeSubCluster = getApp(applicationId);
ApplicationHomeSubCluster app =
ApplicationHomeSubCluster.newInstance(applicationId, Time.now(), homeSubCluster);
return app;
} catch (Exception ex) {
LOG.error("get homeSubCluster by appId = {}.", appId);
}
return null;
} }
@Override @Override

View File

@ -51,6 +51,17 @@ public static ApplicationHomeSubCluster newInstance(ApplicationId appId,
return appMapping; return appMapping;
} }
@Private
@Unstable
public static ApplicationHomeSubCluster newInstance(ApplicationId appId, long createTime,
SubClusterId homeSubCluster) {
ApplicationHomeSubCluster appMapping = Records.newRecord(ApplicationHomeSubCluster.class);
appMapping.setApplicationId(appId);
appMapping.setHomeSubCluster(homeSubCluster);
appMapping.setCreateTime(createTime);
return appMapping;
}
/** /**
* Get the {@link ApplicationId} representing the unique identifier of the * Get the {@link ApplicationId} representing the unique identifier of the
* application. * application.
@ -91,6 +102,25 @@ public static ApplicationHomeSubCluster newInstance(ApplicationId appId,
@Unstable @Unstable
public abstract void setHomeSubCluster(SubClusterId homeSubCluster); public abstract void setHomeSubCluster(SubClusterId homeSubCluster);
/**
* Get the create time of the subcluster.
*
* @return the state of the subcluster
*/
@Public
@Unstable
public abstract long getCreateTime();
/**
* Set the create time of the subcluster.
*
* @param time the last heartbeat time of the subcluster
*/
@Private
@Unstable
public abstract void setCreateTime(long time);
@Override @Override
public boolean equals(Object obj) { public boolean equals(Object obj) {
if (this == obj) { if (this == obj) {

View File

@ -17,6 +17,7 @@
package org.apache.hadoop.yarn.server.federation.store.records; package org.apache.hadoop.yarn.server.federation.store.records;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
@ -37,4 +38,33 @@ public static GetApplicationsHomeSubClusterRequest newInstance() {
return request; return request;
} }
@Private
@Unstable
public static GetApplicationsHomeSubClusterRequest
newInstance(SubClusterId subClusterId) {
GetApplicationsHomeSubClusterRequest request =
Records.newRecord(GetApplicationsHomeSubClusterRequest.class);
request.setSubClusterId(subClusterId);
return request;
}
/**
* Get the {@link SubClusterId} representing the unique identifier of the
* subcluster.
*
* @return the subcluster identifier
*/
@Public
@Unstable
public abstract SubClusterId getSubClusterId();
/**
* Set the {@link SubClusterId} representing the unique identifier of the
* subcluster.
*
* @param subClusterId the subcluster identifier
*/
@Public
@Unstable
public abstract void setSubClusterId(SubClusterId subClusterId);
} }

View File

@ -149,6 +149,16 @@ public void setHomeSubCluster(SubClusterId homeSubCluster) {
this.homeSubCluster = homeSubCluster; this.homeSubCluster = homeSubCluster;
} }
@Override
public long getCreateTime() {
return 0;
}
@Override
public void setCreateTime(long time) {
}
private SubClusterId convertFromProtoFormat(SubClusterIdProto subClusterId) { private SubClusterId convertFromProtoFormat(SubClusterIdProto subClusterId) {
return new SubClusterIdPBImpl(subClusterId); return new SubClusterIdPBImpl(subClusterId);
} }

View File

@ -19,10 +19,13 @@
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos;
import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.GetApplicationsHomeSubClusterRequestProto; import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.GetApplicationsHomeSubClusterRequestProto;
import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.GetApplicationsHomeSubClusterRequestProtoOrBuilder;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest; import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest;
import org.apache.hadoop.thirdparty.protobuf.TextFormat; import org.apache.hadoop.thirdparty.protobuf.TextFormat;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
/** /**
* Protocol buffer based implementation of * Protocol buffer based implementation of
@ -75,4 +78,37 @@ public String toString() {
return TextFormat.shortDebugString(getProto()); return TextFormat.shortDebugString(getProto());
} }
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = GetApplicationsHomeSubClusterRequestProto.newBuilder(proto);
}
viaProto = false;
}
@Override
public SubClusterId getSubClusterId() {
GetApplicationsHomeSubClusterRequestProtoOrBuilder p = viaProto ? proto : builder;
if (!p.hasSubClusterId()) {
return null;
}
return convertFromProtoFormat(p.getSubClusterId());
}
@Override
public void setSubClusterId(SubClusterId subClusterId) {
maybeInitBuilder();
if (subClusterId == null) {
builder.clearSubClusterId();
return;
}
builder.setSubClusterId(convertToProtoFormat(subClusterId));
}
private SubClusterId convertFromProtoFormat(YarnServerFederationProtos.SubClusterIdProto sc) {
return new SubClusterIdPBImpl(sc);
}
private YarnServerFederationProtos.SubClusterIdProto convertToProtoFormat(SubClusterId sc) {
return ((SubClusterIdPBImpl) sc).getProto();
}
} }

View File

@ -28,6 +28,7 @@
import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreInvalidInputException; import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreInvalidInputException;
import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreRetriableException; import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreRetriableException;
import org.apache.hadoop.yarn.server.federation.store.metrics.FederationStateStoreClientMetrics; import org.apache.hadoop.yarn.server.federation.store.metrics.FederationStateStoreClientMetrics;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -279,4 +280,30 @@ public static void setPassword(HikariDataSource dataSource, String password) {
LOG.debug("NULL Credentials specified for Store connection, so ignoring"); LOG.debug("NULL Credentials specified for Store connection, so ignoring");
} }
} }
/**
* Filter HomeSubCluster based on Filter SubCluster.
*
* @param filterSubCluster filter query conditions
* @param homeSubCluster homeSubCluster
* @return return true, if match filter conditions,
* return false, if not match filter conditions.
*/
public static boolean filterHomeSubCluster(SubClusterId filterSubCluster,
SubClusterId homeSubCluster) {
// If the filter condition is empty,
// it means that homeSubCluster needs to be added
if (filterSubCluster == null) {
return true;
}
// If the filter condition filterSubCluster is not empty,
// and filterSubCluster is equal to homeSubCluster, it needs to be added
if (filterSubCluster.equals(homeSubCluster)) {
return true;
}
return false;
}
} }

View File

@ -97,6 +97,7 @@ message GetSubClustersInfoResponseProto {
message ApplicationHomeSubClusterProto { message ApplicationHomeSubClusterProto {
optional ApplicationIdProto application_id = 1; optional ApplicationIdProto application_id = 1;
optional SubClusterIdProto home_sub_cluster = 2; optional SubClusterIdProto home_sub_cluster = 2;
optional int64 create_time = 3;
} }
message AddApplicationHomeSubClusterRequestProto { message AddApplicationHomeSubClusterRequestProto {
@ -123,7 +124,7 @@ message GetApplicationHomeSubClusterResponseProto {
} }
message GetApplicationsHomeSubClusterRequestProto { message GetApplicationsHomeSubClusterRequestProto {
optional SubClusterIdProto sub_cluster_id = 1;
} }
message GetApplicationsHomeSubClusterResponseProto { message GetApplicationsHomeSubClusterResponseProto {

View File

@ -87,6 +87,8 @@ public abstract class FederationStateStoreBaseTest {
private static final MonotonicClock CLOCK = new MonotonicClock(); private static final MonotonicClock CLOCK = new MonotonicClock();
private FederationStateStore stateStore; private FederationStateStore stateStore;
private static final int NUM_APPS_10 = 10;
private static final int NUM_APPS_20 = 20;
protected abstract FederationStateStore createStateStore(); protected abstract FederationStateStore createStateStore();
@ -416,6 +418,89 @@ public void testGetApplicationsHomeSubCluster() throws Exception {
Assert.assertTrue(result.getAppsHomeSubClusters().contains(ahsc2)); Assert.assertTrue(result.getAppsHomeSubClusters().contains(ahsc2));
} }
@Test
public void testGetApplicationsHomeSubClusterEmpty() throws Exception {
LambdaTestUtils.intercept(YarnException.class,
"Missing getApplicationsHomeSubCluster request",
() -> stateStore.getApplicationsHomeSubCluster(null));
}
@Test
public void testGetApplicationsHomeSubClusterFilter() throws Exception {
// Add ApplicationHomeSC - SC1
long now = Time.now();
Set<ApplicationHomeSubCluster> appHomeSubClusters = new HashSet<>();
for (int i = 0; i < NUM_APPS_10; i++) {
ApplicationId appId = ApplicationId.newInstance(now, i);
SubClusterId subClusterId = SubClusterId.newInstance("SC1");
addApplicationHomeSC(appId, subClusterId);
ApplicationHomeSubCluster ahsc =
ApplicationHomeSubCluster.newInstance(appId, subClusterId);
appHomeSubClusters.add(ahsc);
}
// Add ApplicationHomeSC - SC2
for (int i = 10; i < NUM_APPS_20; i++) {
ApplicationId appId = ApplicationId.newInstance(now, i);
SubClusterId subClusterId = SubClusterId.newInstance("SC2");
addApplicationHomeSC(appId, subClusterId);
}
GetApplicationsHomeSubClusterRequest getRequest =
GetApplicationsHomeSubClusterRequest.newInstance();
getRequest.setSubClusterId(SubClusterId.newInstance("SC1"));
GetApplicationsHomeSubClusterResponse result =
stateStore.getApplicationsHomeSubCluster(getRequest);
Assert.assertNotNull(result);
List<ApplicationHomeSubCluster> items = result.getAppsHomeSubClusters();
Assert.assertNotNull(items);
Assert.assertEquals(10, items.size());
for (ApplicationHomeSubCluster item : items) {
Assert.assertTrue(appHomeSubClusters.contains(item));
}
}
@Test
public void testGetApplicationsHomeSubClusterLimit() throws Exception {
// Add ApplicationHomeSC - SC1
long now = Time.now();
for (int i = 0; i < 50; i++) {
ApplicationId appId = ApplicationId.newInstance(now, i);
SubClusterId subClusterId = SubClusterId.newInstance("SC1");
addApplicationHomeSC(appId, subClusterId);
}
GetApplicationsHomeSubClusterRequest getRequest =
GetApplicationsHomeSubClusterRequest.newInstance();
getRequest.setSubClusterId(SubClusterId.newInstance("SC1"));
GetApplicationsHomeSubClusterResponse result =
stateStore.getApplicationsHomeSubCluster(getRequest);
Assert.assertNotNull(result);
// Write 50 records, but get 10 records because the maximum number is limited to 10
List<ApplicationHomeSubCluster> items = result.getAppsHomeSubClusters();
Assert.assertNotNull(items);
Assert.assertEquals(10, items.size());
GetApplicationsHomeSubClusterRequest getRequest1 =
GetApplicationsHomeSubClusterRequest.newInstance();
getRequest1.setSubClusterId(SubClusterId.newInstance("SC2"));
GetApplicationsHomeSubClusterResponse result1 =
stateStore.getApplicationsHomeSubCluster(getRequest1);
Assert.assertNotNull(result1);
// SC2 data does not exist, so the number of returned records is 0
List<ApplicationHomeSubCluster> items1 = result1.getAppsHomeSubClusters();
Assert.assertNotNull(items1);
Assert.assertEquals(0, items1.size());
}
@Test @Test
public void testUpdateApplicationHomeSubCluster() throws Exception { public void testUpdateApplicationHomeSubCluster() throws Exception {
ApplicationId appId = ApplicationId.newInstance(1, 1); ApplicationId appId = ApplicationId.newInstance(1, 1);

View File

@ -31,6 +31,7 @@
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -50,6 +51,7 @@ public class HSQLDBFederationStateStore extends SQLFederationStateStore {
" CREATE TABLE applicationsHomeSubCluster (" " CREATE TABLE applicationsHomeSubCluster ("
+ " applicationId varchar(64) NOT NULL," + " applicationId varchar(64) NOT NULL,"
+ " homeSubCluster varchar(256) NOT NULL," + " homeSubCluster varchar(256) NOT NULL,"
+ " createTime datetime NOT NULL,"
+ " CONSTRAINT pk_applicationId PRIMARY KEY (applicationId))"; + " CONSTRAINT pk_applicationId PRIMARY KEY (applicationId))";
private static final String TABLE_MEMBERSHIP = private static final String TABLE_MEMBERSHIP =
@ -149,8 +151,9 @@ public class HSQLDBFederationStateStore extends SQLFederationStateStore {
+ " OUT storedHomeSubCluster_OUT varchar(256), OUT rowCount_OUT int)" + " OUT storedHomeSubCluster_OUT varchar(256), OUT rowCount_OUT int)"
+ " MODIFIES SQL DATA BEGIN ATOMIC" + " MODIFIES SQL DATA BEGIN ATOMIC"
+ " INSERT INTO applicationsHomeSubCluster " + " INSERT INTO applicationsHomeSubCluster "
+ " (applicationId,homeSubCluster) " + " (applicationId,homeSubCluster,createTime) "
+ " (SELECT applicationId_IN, homeSubCluster_IN" + " (SELECT applicationId_IN, homeSubCluster_IN, "
+ " NOW() AT TIME ZONE INTERVAL '0:00' HOUR TO MINUTE"
+ " FROM applicationsHomeSubCluster" + " FROM applicationsHomeSubCluster"
+ " WHERE applicationId = applicationId_IN" + " WHERE applicationId = applicationId_IN"
+ " HAVING COUNT(*) = 0 );" + " HAVING COUNT(*) = 0 );"
@ -179,11 +182,16 @@ public class HSQLDBFederationStateStore extends SQLFederationStateStore {
+ " WHERE applicationId = applicationID_IN; END"; + " WHERE applicationId = applicationID_IN; END";
private static final String SP_GETAPPLICATIONSHOMESUBCLUSTER = private static final String SP_GETAPPLICATIONSHOMESUBCLUSTER =
"CREATE PROCEDURE sp_getApplicationsHomeSubCluster()" "CREATE PROCEDURE sp_getApplicationsHomeSubCluster("
+ "IN limit_IN int, IN homeSubCluster_IN varchar(256))"
+ " MODIFIES SQL DATA DYNAMIC RESULT SETS 1 BEGIN ATOMIC" + " MODIFIES SQL DATA DYNAMIC RESULT SETS 1 BEGIN ATOMIC"
+ " DECLARE result CURSOR FOR" + " DECLARE result CURSOR FOR"
+ " SELECT applicationId, homeSubCluster" + " SELECT applicationId, homeSubCluster, createTime"
+ " FROM applicationsHomeSubCluster; OPEN result; END"; + " FROM applicationsHomeSubCluster "
+ " WHERE ROWNUM() <= limit_IN AND "
+ " (homeSubCluster_IN = '' OR homeSubCluster = homeSubCluster_IN) "
+ " ORDER BY createTime desc; "
+ " OPEN result; END";
private static final String SP_DELETEAPPLICATIONHOMESUBCLUSTER = private static final String SP_DELETEAPPLICATIONHOMESUBCLUSTER =
"CREATE PROCEDURE sp_deleteApplicationHomeSubCluster(" "CREATE PROCEDURE sp_deleteApplicationHomeSubCluster("
@ -315,6 +323,7 @@ public class HSQLDBFederationStateStore extends SQLFederationStateStore {
@Override @Override
public void init(Configuration conf) { public void init(Configuration conf) {
try { try {
conf.setInt(YarnConfiguration.FEDERATION_STATESTORE_MAX_APPLICATIONS, 10);
super.init(conf); super.init(conf);
conn = super.conn; conn = super.conn;

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.federation.store.impl; package org.apache.hadoop.yarn.server.federation.store.impl;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
/** /**
@ -29,6 +30,7 @@ public class TestMemoryFederationStateStore
@Override @Override
protected FederationStateStore createStateStore() { protected FederationStateStore createStateStore() {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.setInt(YarnConfiguration.FEDERATION_STATESTORE_MAX_APPLICATIONS, 10);
super.setConf(conf); super.setConf(conf);
return new MemoryFederationStateStore(); return new MemoryFederationStateStore();
} }

View File

@ -84,6 +84,7 @@ protected FederationStateStore createStateStore() {
DATABASE_PASSWORD); DATABASE_PASSWORD);
conf.set(YarnConfiguration.FEDERATION_STATESTORE_SQL_URL, conf.set(YarnConfiguration.FEDERATION_STATESTORE_SQL_URL,
DATABASE_URL + System.currentTimeMillis()); DATABASE_URL + System.currentTimeMillis());
conf.setInt(YarnConfiguration.FEDERATION_STATESTORE_MAX_APPLICATIONS, 10);
super.setConf(conf); super.setConf(conf);
return new HSQLDBFederationStateStore(); return new HSQLDBFederationStateStore();
} }

View File

@ -68,6 +68,7 @@ public void before() throws IOException, YarnException {
Configuration conf = new YarnConfiguration(); Configuration conf = new YarnConfiguration();
conf.set(CommonConfigurationKeys.ZK_ADDRESS, connectString); conf.set(CommonConfigurationKeys.ZK_ADDRESS, connectString);
conf.setInt(YarnConfiguration.FEDERATION_STATESTORE_MAX_APPLICATIONS, 10);
setConf(conf); setConf(conf);
} catch (Exception e) { } catch (Exception e) {
LOG.error("Cannot initialize ZooKeeper store", e); LOG.error("Cannot initialize ZooKeeper store", e);