YARN-5407. In-memory based implementation of the FederationApplicationStateStore/FederationPolicyStateStore. (Ellen Hui via Subru)

(cherry picked from commit b747d59f41f08dabe4f3a486a2dbd4bed5723867)
This commit is contained in:
Subru Krishnan 2016-08-09 16:07:55 -07:00 committed by Carlo Curino
parent e0c3a44396
commit 0733088c29
14 changed files with 558 additions and 75 deletions

View File

@ -20,34 +20,71 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.FederationMembershipStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.util.MonotonicClock;
/**
* In-memory implementation of FederationMembershipStateStore.
* In-memory implementation of {@link FederationStateStore}.
*/
public class MemoryFederationStateStore
implements FederationMembershipStateStore {
public class MemoryFederationStateStore implements FederationStateStore {
private Map<SubClusterId, SubClusterInfo> membership;
private Map<ApplicationId, SubClusterId> applications;
private Map<String, SubClusterPolicyConfiguration> policies;
private final Map<SubClusterId, SubClusterInfo> membership =
new ConcurrentHashMap<SubClusterId, SubClusterInfo>();
private final MonotonicClock clock = new MonotonicClock();
@Override
public void init(Configuration conf) {
membership = new ConcurrentHashMap<SubClusterId, SubClusterInfo>();
applications = new ConcurrentHashMap<ApplicationId, SubClusterId>();
policies = new ConcurrentHashMap<String, SubClusterPolicyConfiguration>();
}
@Override
public void close() {
membership = null;
applications = null;
policies = null;
}
@Override
public SubClusterRegisterResponse registerSubCluster(
SubClusterRegisterRequest request) throws YarnException {
@ -116,4 +153,113 @@ public GetSubClustersInfoResponse getSubClusters(
return GetSubClustersInfoResponse.newInstance(result);
}
// FederationApplicationHomeSubClusterStore methods
@Override
public AddApplicationHomeSubClusterResponse addApplicationHomeSubClusterMap(
AddApplicationHomeSubClusterRequest request) throws YarnException {
ApplicationId appId =
request.getApplicationHomeSubCluster().getApplicationId();
if (applications.containsKey(appId)) {
throw new YarnException("Application " + appId + " already exists");
}
applications.put(appId,
request.getApplicationHomeSubCluster().getHomeSubCluster());
return AddApplicationHomeSubClusterResponse.newInstance();
}
@Override
public UpdateApplicationHomeSubClusterResponse updateApplicationHomeSubClusterMap(
UpdateApplicationHomeSubClusterRequest request) throws YarnException {
ApplicationId appId =
request.getApplicationHomeSubCluster().getApplicationId();
if (!applications.containsKey(appId)) {
throw new YarnException("Application " + appId + " does not exist");
}
applications.put(appId,
request.getApplicationHomeSubCluster().getHomeSubCluster());
return UpdateApplicationHomeSubClusterResponse.newInstance();
}
@Override
public GetApplicationHomeSubClusterResponse getApplicationHomeSubClusterMap(
GetApplicationHomeSubClusterRequest request) throws YarnException {
ApplicationId appId = request.getApplicationId();
if (!applications.containsKey(appId)) {
throw new YarnException("Application " + appId + " does not exist");
}
return GetApplicationHomeSubClusterResponse.newInstance(
ApplicationHomeSubCluster.newInstance(appId, applications.get(appId)));
}
@Override
public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubClusterMap(
GetApplicationsHomeSubClusterRequest request) throws YarnException {
List<ApplicationHomeSubCluster> result =
new ArrayList<ApplicationHomeSubCluster>();
for (Entry<ApplicationId, SubClusterId> e : applications.entrySet()) {
result
.add(ApplicationHomeSubCluster.newInstance(e.getKey(), e.getValue()));
}
GetApplicationsHomeSubClusterResponse.newInstance(result);
return GetApplicationsHomeSubClusterResponse.newInstance(result);
}
@Override
public DeleteApplicationHomeSubClusterResponse deleteApplicationHomeSubClusterMap(
DeleteApplicationHomeSubClusterRequest request) throws YarnException {
ApplicationId appId = request.getApplicationId();
if (!applications.containsKey(appId)) {
throw new YarnException("Application " + appId + " does not exist");
}
applications.remove(appId);
return DeleteApplicationHomeSubClusterResponse.newInstance();
}
@Override
public GetSubClusterPolicyConfigurationResponse getPolicyConfiguration(
GetSubClusterPolicyConfigurationRequest request) throws YarnException {
String queue = request.getQueue();
if (!policies.containsKey(queue)) {
throw new YarnException("Policy for queue " + queue + " does not exist");
}
return GetSubClusterPolicyConfigurationResponse
.newInstance(policies.get(queue));
}
@Override
public SetSubClusterPolicyConfigurationResponse setPolicyConfiguration(
SetSubClusterPolicyConfigurationRequest request) throws YarnException {
policies.put(request.getPolicyConfiguration().getQueue(),
request.getPolicyConfiguration());
return SetSubClusterPolicyConfigurationResponse.newInstance();
}
@Override
public GetSubClusterPoliciesConfigurationsResponse getPoliciesConfigurations(
GetSubClusterPoliciesConfigurationsRequest request) throws YarnException {
ArrayList<SubClusterPolicyConfiguration> result =
new ArrayList<SubClusterPolicyConfiguration>();
for (SubClusterPolicyConfiguration policy : policies.values()) {
result.add(policy);
}
return GetSubClusterPoliciesConfigurationsResponse.newInstance(result);
}
@Override
public Version getCurrentVersion() {
return null;
}
@Override
public Version loadVersion() {
return null;
}
}

View File

@ -29,7 +29,7 @@
@Private
@Unstable
public abstract class GetSubClusterPoliciesConfigurationsRequest {
public GetSubClusterPoliciesConfigurationsRequest newInstance() {
public static GetSubClusterPoliciesConfigurationsRequest newInstance() {
return Records.newRecord(GetSubClusterPoliciesConfigurationsRequest.class);
}
}

View File

@ -36,7 +36,7 @@ public abstract class GetSubClusterPoliciesConfigurationsResponse {
@Private
@Unstable
public GetSubClusterPoliciesConfigurationsResponse newInstance(
public static GetSubClusterPoliciesConfigurationsResponse newInstance(
List<SubClusterPolicyConfiguration> policyConfigurations) {
GetSubClusterPoliciesConfigurationsResponse response =
Records.newRecord(GetSubClusterPoliciesConfigurationsResponse.class);

View File

@ -33,7 +33,8 @@ public abstract class GetSubClusterPolicyConfigurationRequest {
@Private
@Unstable
public GetSubClusterPolicyConfigurationRequest newInstance(String queueName) {
public static GetSubClusterPolicyConfigurationRequest newInstance(
String queueName) {
GetSubClusterPolicyConfigurationRequest request =
Records.newRecord(GetSubClusterPolicyConfigurationRequest.class);
request.setQueue(queueName);

View File

@ -34,7 +34,7 @@ public abstract class GetSubClusterPolicyConfigurationResponse {
@Private
@Unstable
public GetSubClusterPolicyConfigurationResponse newInstance(
public static GetSubClusterPolicyConfigurationResponse newInstance(
SubClusterPolicyConfiguration policy) {
GetSubClusterPolicyConfigurationResponse response =
Records.newRecord(GetSubClusterPolicyConfigurationResponse.class);

View File

@ -32,7 +32,7 @@
public abstract class SetSubClusterPolicyConfigurationRequest {
@Private
@Unstable
public SetSubClusterPolicyConfigurationRequest newInstance(
public static SetSubClusterPolicyConfigurationRequest newInstance(
SubClusterPolicyConfiguration policy) {
SetSubClusterPolicyConfigurationRequest request =
Records.newRecord(SetSubClusterPolicyConfigurationRequest.class);
@ -40,24 +40,6 @@ public SetSubClusterPolicyConfigurationRequest newInstance(
return request;
}
/**
* Get the name of the queue for which we are configuring a policy.
*
* @return the name of the queue
*/
@Public
@Unstable
public abstract String getQueue();
/**
* Sets the name of the queue for which we are configuring a policy.
*
* @param queueName the name of the queue
*/
@Private
@Unstable
public abstract void setQueue(String queueName);
/**
* Get the policy configuration assigned to the queue.
*

View File

@ -30,7 +30,7 @@
@Private
@Unstable
public abstract class SetSubClusterPolicyConfigurationResponse {
public SetSubClusterPolicyConfigurationResponse newInstance() {
public static SetSubClusterPolicyConfigurationResponse newInstance() {
return Records.newRecord(SetSubClusterPolicyConfigurationResponse.class);
}
}

View File

@ -29,8 +29,8 @@
/**
* {@link SubClusterPolicyConfiguration} is a class that represents a
* configuration of a policy. It contains a policy type (resolve to a class
* name) and its params as an opaque {@link ByteBuffer}.
* configuration of a policy. For a single queue, it contains a policy type
* (resolve to a class name) and its params as an opaque {@link ByteBuffer}.
*
* Note: by design the params are an opaque ByteBuffer, this allows for enough
* flexibility to evolve the policies without impacting the protocols to/from
@ -42,15 +42,34 @@ public abstract class SubClusterPolicyConfiguration {
@Private
@Unstable
public static SubClusterPolicyConfiguration newInstance(String policyType,
ByteBuffer policyParams) {
public static SubClusterPolicyConfiguration newInstance(String queue,
String policyType, ByteBuffer policyParams) {
SubClusterPolicyConfiguration policy =
Records.newRecord(SubClusterPolicyConfiguration.class);
policy.setQueue(queue);
policy.setType(policyType);
policy.setParams(policyParams);
return policy;
}
/**
* Get the name of the queue for which we are configuring a policy.
*
* @return the name of the queue
*/
@Public
@Unstable
public abstract String getQueue();
/**
* Sets the name of the queue for which we are configuring a policy.
*
* @param queueName the name of the queue
*/
@Private
@Unstable
public abstract void setQueue(String queueName);
/**
* Get the type of the policy. This could be random, round-robin, load-based,
* etc.

View File

@ -108,6 +108,10 @@ public String toString() {
public ApplicationId getApplicationId() {
GetApplicationHomeSubClusterRequestProtoOrBuilder p =
viaProto ? proto : builder;
if (applicationId != null) {
return applicationId;
}
if (!p.hasApplicationId()) {
return null;
}

View File

@ -106,23 +106,6 @@ public String toString() {
return TextFormat.shortDebugString(getProto());
}
@Override
public String getQueue() {
SetSubClusterPolicyConfigurationRequestProtoOrBuilder p =
viaProto ? proto : builder;
return p.getQueue();
}
@Override
public void setQueue(String queueName) {
maybeInitBuilder();
if (queueName == null) {
builder.clearQueue();
return;
}
builder.setQueue(queueName);
}
@Override
public SubClusterPolicyConfiguration getPolicyConfiguration() {
SetSubClusterPolicyConfigurationRequestProtoOrBuilder p =

View File

@ -86,6 +86,23 @@ public String toString() {
return TextFormat.shortDebugString(getProto());
}
@Override
public String getQueue() {
SubClusterPolicyConfigurationProtoOrBuilder p = viaProto ? proto : builder;
return p.getQueue();
}
@Override
public void setQueue(String queueName) {
maybeInitBuilder();
if (queueName == null) {
builder.clearType();
return;
}
builder.setQueue(queueName);
}
@Override
public String getType() {
SubClusterPolicyConfigurationProtoOrBuilder p = viaProto ? proto : builder;

View File

@ -136,8 +136,9 @@ message DeleteApplicationHomeSubClusterResponseProto {
}
message SubClusterPolicyConfigurationProto {
optional string type = 1;
optional bytes params = 2;
optional string queue = 1;
optional string type = 2;
optional bytes params = 3;
}
message GetSubClusterPolicyConfigurationRequestProto {
@ -149,8 +150,7 @@ message GetSubClusterPolicyConfigurationResponseProto {
}
message SetSubClusterPolicyConfigurationRequestProto {
optional string queue = 1;
optional SubClusterPolicyConfigurationProto policy_configuration = 2;
optional SubClusterPolicyConfigurationProto policy_configuration = 1;
}
message SetSubClusterPolicyConfigurationResponseProto {

View File

@ -18,18 +18,39 @@
package org.apache.hadoop.yarn.server.federation.store.impl;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.FederationMembershipStateStore;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.util.MonotonicClock;
import org.junit.After;
import org.junit.Assert;
@ -42,20 +63,21 @@
public abstract class FederationStateStoreBaseTest {
private static final MonotonicClock CLOCK = new MonotonicClock();
private FederationStateStore stateStore = createStateStore();
private FederationMembershipStateStore stateStore;
protected abstract FederationStateStore createStateStore();
@Before
public void before() throws IOException {
stateStore = getCleanStateStore();
public void before() throws IOException, YarnException {
stateStore.init(new Configuration());
}
@After
public void after() {
stateStore = null;
public void after() throws Exception {
stateStore.close();
}
protected abstract FederationMembershipStateStore getCleanStateStore();
// Test FederationMembershipStateStore
@Test
public void testRegisterSubCluster() throws Exception {
@ -72,10 +94,7 @@ public void testRegisterSubCluster() throws Exception {
@Test
public void testDeregisterSubCluster() throws Exception {
SubClusterId subClusterId = SubClusterId.newInstance("SC");
SubClusterInfo subClusterInfo = createSubClusterInfo(subClusterId);
stateStore.registerSubCluster(
SubClusterRegisterRequest.newInstance(subClusterInfo));
registerSubCluster(subClusterId);
SubClusterDeregisterRequest deregisterRequest = SubClusterDeregisterRequest
.newInstance(subClusterId, SubClusterState.SC_UNREGISTERED);
@ -105,9 +124,7 @@ public void testGetSubClusterInfo() throws Exception {
SubClusterId subClusterId = SubClusterId.newInstance("SC");
SubClusterInfo subClusterInfo = createSubClusterInfo(subClusterId);
stateStore.registerSubCluster(
SubClusterRegisterRequest.newInstance(subClusterInfo));
registerSubCluster(subClusterId);
GetSubClusterInfoRequest request =
GetSubClusterInfoRequest.newInstance(subClusterId);
@ -167,10 +184,7 @@ public void testGetAllSubClustersInfo() throws Exception {
@Test
public void testSubClusterHeartbeat() throws Exception {
SubClusterId subClusterId = SubClusterId.newInstance("SC");
SubClusterInfo subClusterInfo = createSubClusterInfo(subClusterId);
stateStore.registerSubCluster(
SubClusterRegisterRequest.newInstance(subClusterInfo));
registerSubCluster(subClusterId);
SubClusterHeartbeatRequest heartbeatRequest = SubClusterHeartbeatRequest
.newInstance(subClusterId, SubClusterState.SC_RUNNING, "cabability");
@ -196,6 +210,271 @@ public void testSubClusterHeartbeatUnknownSubCluster() throws Exception {
}
}
// Test FederationApplicationHomeSubClusterStore
@Test
public void testAddApplicationHomeSubClusterMap() throws Exception {
ApplicationId appId = ApplicationId.newInstance(1, 1);
SubClusterId subClusterId = SubClusterId.newInstance("SC");
ApplicationHomeSubCluster ahsc =
ApplicationHomeSubCluster.newInstance(appId, subClusterId);
AddApplicationHomeSubClusterRequest request =
AddApplicationHomeSubClusterRequest.newInstance(ahsc);
AddApplicationHomeSubClusterResponse response =
stateStore.addApplicationHomeSubClusterMap(request);
Assert.assertNotNull(response);
Assert.assertEquals(subClusterId, queryApplicationHomeSC(appId));
}
@Test
public void testAddApplicationHomeSubClusterMapAppAlreadyExists()
throws Exception {
ApplicationId appId = ApplicationId.newInstance(1, 1);
SubClusterId subClusterId1 = SubClusterId.newInstance("SC1");
addApplicationHomeSC(appId, subClusterId1);
SubClusterId subClusterId2 = SubClusterId.newInstance("SC2");
ApplicationHomeSubCluster ahsc2 =
ApplicationHomeSubCluster.newInstance(appId, subClusterId2);
try {
stateStore.addApplicationHomeSubClusterMap(
AddApplicationHomeSubClusterRequest.newInstance(ahsc2));
Assert.fail();
} catch (YarnException e) {
Assert.assertTrue(e.getMessage()
.startsWith("Application " + appId.toString() + " already exists"));
}
Assert.assertEquals(subClusterId1, queryApplicationHomeSC(appId));
}
@Test
public void testDeleteApplicationHomeSubClusterMap() throws Exception {
ApplicationId appId = ApplicationId.newInstance(1, 1);
SubClusterId subClusterId = SubClusterId.newInstance("SC");
addApplicationHomeSC(appId, subClusterId);
DeleteApplicationHomeSubClusterRequest delRequest =
DeleteApplicationHomeSubClusterRequest.newInstance(appId);
DeleteApplicationHomeSubClusterResponse response =
stateStore.deleteApplicationHomeSubClusterMap(delRequest);
Assert.assertNotNull(response);
try {
queryApplicationHomeSC(appId);
Assert.fail();
} catch (YarnException e) {
Assert.assertTrue(e.getMessage()
.startsWith("Application " + appId + " does not exist"));
}
}
@Test
public void testDeleteApplicationHomeSubClusterMapUnknownApp()
throws Exception {
ApplicationId appId = ApplicationId.newInstance(1, 1);
DeleteApplicationHomeSubClusterRequest delRequest =
DeleteApplicationHomeSubClusterRequest.newInstance(appId);
try {
stateStore.deleteApplicationHomeSubClusterMap(delRequest);
Assert.fail();
} catch (YarnException e) {
Assert.assertTrue(e.getMessage()
.startsWith("Application " + appId.toString() + " does not exist"));
}
}
@Test
public void testGetApplicationHomeSubClusterMap() throws Exception {
ApplicationId appId = ApplicationId.newInstance(1, 1);
SubClusterId subClusterId = SubClusterId.newInstance("SC");
addApplicationHomeSC(appId, subClusterId);
GetApplicationHomeSubClusterRequest getRequest =
GetApplicationHomeSubClusterRequest.newInstance(appId);
GetApplicationHomeSubClusterResponse result =
stateStore.getApplicationHomeSubClusterMap(getRequest);
Assert.assertEquals(appId,
result.getApplicationHomeSubCluster().getApplicationId());
Assert.assertEquals(subClusterId,
result.getApplicationHomeSubCluster().getHomeSubCluster());
}
@Test
public void testGetApplicationHomeSubClusterMapUnknownApp() throws Exception {
ApplicationId appId = ApplicationId.newInstance(1, 1);
GetApplicationHomeSubClusterRequest request =
GetApplicationHomeSubClusterRequest.newInstance(appId);
try {
stateStore.getApplicationHomeSubClusterMap(request);
Assert.fail();
} catch (YarnException e) {
Assert.assertTrue(e.getMessage()
.startsWith("Application " + appId.toString() + " does not exist"));
}
}
@Test
public void testGetApplicationsHomeSubClusterMap() throws Exception {
ApplicationId appId1 = ApplicationId.newInstance(1, 1);
SubClusterId subClusterId1 = SubClusterId.newInstance("SC1");
ApplicationHomeSubCluster ahsc1 =
ApplicationHomeSubCluster.newInstance(appId1, subClusterId1);
ApplicationId appId2 = ApplicationId.newInstance(1, 2);
SubClusterId subClusterId2 = SubClusterId.newInstance("SC2");
ApplicationHomeSubCluster ahsc2 =
ApplicationHomeSubCluster.newInstance(appId2, subClusterId2);
addApplicationHomeSC(appId1, subClusterId1);
addApplicationHomeSC(appId2, subClusterId2);
GetApplicationsHomeSubClusterRequest getRequest =
GetApplicationsHomeSubClusterRequest.newInstance();
GetApplicationsHomeSubClusterResponse result =
stateStore.getApplicationsHomeSubClusterMap(getRequest);
Assert.assertEquals(2, result.getAppsHomeSubClusters().size());
Assert.assertTrue(result.getAppsHomeSubClusters().contains(ahsc1));
Assert.assertTrue(result.getAppsHomeSubClusters().contains(ahsc2));
}
@Test
public void testUpdateApplicationHomeSubClusterMap() throws Exception {
ApplicationId appId = ApplicationId.newInstance(1, 1);
SubClusterId subClusterId1 = SubClusterId.newInstance("SC1");
addApplicationHomeSC(appId, subClusterId1);
SubClusterId subClusterId2 = SubClusterId.newInstance("SC2");
ApplicationHomeSubCluster ahscUpdate =
ApplicationHomeSubCluster.newInstance(appId, subClusterId2);
UpdateApplicationHomeSubClusterRequest updateRequest =
UpdateApplicationHomeSubClusterRequest.newInstance(ahscUpdate);
UpdateApplicationHomeSubClusterResponse response =
stateStore.updateApplicationHomeSubClusterMap(updateRequest);
Assert.assertNotNull(response);
Assert.assertEquals(subClusterId2, queryApplicationHomeSC(appId));
}
@Test
public void testUpdateApplicationHomeSubClusterMapUnknownApp()
throws Exception {
ApplicationId appId = ApplicationId.newInstance(1, 1);
SubClusterId subClusterId1 = SubClusterId.newInstance("SC1");
ApplicationHomeSubCluster ahsc =
ApplicationHomeSubCluster.newInstance(appId, subClusterId1);
UpdateApplicationHomeSubClusterRequest updateRequest =
UpdateApplicationHomeSubClusterRequest.newInstance(ahsc);
try {
stateStore.updateApplicationHomeSubClusterMap((updateRequest));
Assert.fail();
} catch (YarnException e) {
Assert.assertTrue(e.getMessage()
.startsWith("Application " + appId.toString() + " does not exist"));
}
}
// Test FederationPolicyStore
@Test
public void testSetPolicyConfiguration() throws Exception {
SetSubClusterPolicyConfigurationRequest request =
SetSubClusterPolicyConfigurationRequest
.newInstance(createSCPolicyConf("Queue", "PolicyType"));
SetSubClusterPolicyConfigurationResponse result =
stateStore.setPolicyConfiguration(request);
Assert.assertNotNull(result);
Assert.assertEquals(createSCPolicyConf("Queue", "PolicyType"),
queryPolicy("Queue"));
}
@Test
public void testSetPolicyConfigurationUpdateExisting() throws Exception {
setPolicyConf("Queue", "PolicyType1");
SetSubClusterPolicyConfigurationRequest request2 =
SetSubClusterPolicyConfigurationRequest
.newInstance(createSCPolicyConf("Queue", "PolicyType2"));
SetSubClusterPolicyConfigurationResponse result =
stateStore.setPolicyConfiguration(request2);
Assert.assertNotNull(result);
Assert.assertEquals(createSCPolicyConf("Queue", "PolicyType2"),
queryPolicy("Queue"));
}
@Test
public void testGetPolicyConfiguration() throws Exception {
setPolicyConf("Queue", "PolicyType");
GetSubClusterPolicyConfigurationRequest getRequest =
GetSubClusterPolicyConfigurationRequest.newInstance("Queue");
GetSubClusterPolicyConfigurationResponse result =
stateStore.getPolicyConfiguration(getRequest);
Assert.assertNotNull(result);
Assert.assertEquals(createSCPolicyConf("Queue", "PolicyType"),
result.getPolicyConfiguration());
}
@Test
public void testGetPolicyConfigurationUnknownQueue() throws Exception {
GetSubClusterPolicyConfigurationRequest request =
GetSubClusterPolicyConfigurationRequest.newInstance("Queue");
try {
stateStore.getPolicyConfiguration(request);
Assert.fail();
} catch (YarnException e) {
Assert.assertTrue(
e.getMessage().startsWith("Policy for queue Queue does not exist"));
}
}
@Test
public void testGetPoliciesConfigurations() throws Exception {
setPolicyConf("Queue1", "PolicyType1");
setPolicyConf("Queue2", "PolicyType2");
GetSubClusterPoliciesConfigurationsResponse response =
stateStore.getPoliciesConfigurations(
GetSubClusterPoliciesConfigurationsRequest.newInstance());
Assert.assertNotNull(response);
Assert.assertNotNull(response.getPoliciesConfigs());
Assert.assertEquals(2, response.getPoliciesConfigs().size());
Assert.assertTrue(response.getPoliciesConfigs()
.contains(createSCPolicyConf("Queue1", "PolicyType1")));
Assert.assertTrue(response.getPoliciesConfigs()
.contains(createSCPolicyConf("Queue2", "PolicyType2")));
}
// Convenience methods
private SubClusterInfo createSubClusterInfo(SubClusterId subClusterId) {
String amRMAddress = "1.2.3.4:1";
@ -208,6 +487,37 @@ private SubClusterInfo createSubClusterInfo(SubClusterId subClusterId) {
CLOCK.getTime(), "cabability");
}
private SubClusterPolicyConfiguration createSCPolicyConf(String queueName,
String policyType) {
return SubClusterPolicyConfiguration.newInstance(queueName, policyType,
ByteBuffer.allocate(1));
}
private void addApplicationHomeSC(ApplicationId appId,
SubClusterId subClusterId) throws YarnException {
ApplicationHomeSubCluster ahsc =
ApplicationHomeSubCluster.newInstance(appId, subClusterId);
AddApplicationHomeSubClusterRequest request =
AddApplicationHomeSubClusterRequest.newInstance(ahsc);
stateStore.addApplicationHomeSubClusterMap(request);
}
private void setPolicyConf(String queue, String policyType)
throws YarnException {
SetSubClusterPolicyConfigurationRequest request =
SetSubClusterPolicyConfigurationRequest
.newInstance(createSCPolicyConf(queue, policyType));
stateStore.setPolicyConfiguration(request);
}
private void registerSubCluster(SubClusterId subClusterId)
throws YarnException {
SubClusterInfo subClusterInfo = createSubClusterInfo(subClusterId);
stateStore.registerSubCluster(
SubClusterRegisterRequest.newInstance(subClusterInfo));
}
private SubClusterInfo querySubClusterInfo(SubClusterId subClusterId)
throws YarnException {
GetSubClusterInfoRequest request =
@ -215,4 +525,25 @@ private SubClusterInfo querySubClusterInfo(SubClusterId subClusterId)
return stateStore.getSubCluster(request).getSubClusterInfo();
}
private SubClusterId queryApplicationHomeSC(ApplicationId appId)
throws YarnException {
GetApplicationHomeSubClusterRequest request =
GetApplicationHomeSubClusterRequest.newInstance(appId);
GetApplicationHomeSubClusterResponse response =
stateStore.getApplicationHomeSubClusterMap(request);
return response.getApplicationHomeSubCluster().getHomeSubCluster();
}
private SubClusterPolicyConfiguration queryPolicy(String queue)
throws YarnException {
GetSubClusterPolicyConfigurationRequest request =
GetSubClusterPolicyConfigurationRequest.newInstance(queue);
GetSubClusterPolicyConfigurationResponse result =
stateStore.getPolicyConfiguration(request);
return result.getPolicyConfiguration();
}
}

View File

@ -17,7 +17,7 @@
package org.apache.hadoop.yarn.server.federation.store.impl;
import org.apache.hadoop.yarn.server.federation.store.FederationMembershipStateStore;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
/**
* Unit tests for MemoryFederationStateStore.
@ -26,7 +26,7 @@ public class TestMemoryFederationStateStore
extends FederationStateStoreBaseTest {
@Override
protected FederationMembershipStateStore getCleanStateStore() {
protected FederationStateStore createStateStore() {
return new MemoryFederationStateStore();
}
}