diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/AbstractAMRMProxyPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/AbstractAMRMProxyPolicy.java index e853744e10..07cd6db190 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/AbstractAMRMProxyPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/AbstractAMRMProxyPolicy.java @@ -20,9 +20,12 @@ import java.util.Map; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.policies.AbstractConfigurableFederationPolicy; import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; /** @@ -44,4 +47,9 @@ public void validate(WeightedPolicyInfo newPolicyInfo) } } + @Override + public void notifyOfResponse(SubClusterId subClusterId, + AllocateResponse response) throws YarnException { + // By default, a stateless policy does not care about responses + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/BroadcastAMRMProxyPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/BroadcastAMRMProxyPolicy.java index 7fddb8ea95..eb83baa844 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/BroadcastAMRMProxyPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/BroadcastAMRMProxyPolicy.java @@ -22,7 +22,6 @@ import java.util.List; import java.util.Map; -import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext; @@ -65,10 +64,4 @@ public Map> splitResourceRequests( return answer; } - @Override - public void notifyOfResponse(SubClusterId subClusterId, - AllocateResponse response) throws YarnException { - // stateless policy does not care about responses - } - } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/HomeAMRMProxyPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/HomeAMRMProxyPolicy.java new file mode 100644 index 0000000000..5dd5c53188 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/HomeAMRMProxyPolicy.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.federation.policies.amrmproxy; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext; +import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContextValidator; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; + +/** + * An implementation of the {@link FederationAMRMProxyPolicy} that simply + * sends the {@link ResourceRequest} to the home subcluster. + */ +public class HomeAMRMProxyPolicy extends AbstractAMRMProxyPolicy { + + /** Identifier of the local subcluster. */ + private SubClusterId homeSubcluster; + + @Override + public void reinitialize( + FederationPolicyInitializationContext policyContext) + throws FederationPolicyInitializationException { + + FederationPolicyInitializationContextValidator + .validate(policyContext, this.getClass().getCanonicalName()); + setPolicyContext(policyContext); + + this.homeSubcluster = policyContext.getHomeSubcluster(); + } + + @Override + public Map> splitResourceRequests( + List resourceRequests) throws YarnException { + + if (homeSubcluster == null) { + throw new FederationPolicyException("No home subcluster available"); + } + + Map active = getActiveSubclusters(); + if (!active.containsKey(homeSubcluster)) { + throw new FederationPolicyException( + "The local subcluster " + homeSubcluster + " is not active"); + } + + List resourceRequestsCopy = + new ArrayList<>(resourceRequests); + return Collections.singletonMap(homeSubcluster, resourceRequestsCopy); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/RejectAMRMProxyPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/RejectAMRMProxyPolicy.java index 450060671c..bed037e463 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/RejectAMRMProxyPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/RejectAMRMProxyPolicy.java @@ -21,7 +21,6 @@ import java.util.List; import java.util.Map; -import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext; @@ -53,11 +52,4 @@ public Map> splitResourceRequests( + "rejects all routing requests by construction."); } - @Override - public void notifyOfResponse(SubClusterId subClusterId, - AllocateResponse response) throws YarnException { - // This might be invoked for applications started with a previous policy, - // do nothing for this policy. - } - } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/HomePolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/HomePolicyManager.java new file mode 100644 index 0000000000..93aa248864 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/HomePolicyManager.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.federation.policies.manager; + +import java.nio.ByteBuffer; +import java.util.Collections; + +import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.HomeAMRMProxyPolicy; +import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; +import org.apache.hadoop.yarn.server.federation.policies.router.UniformRandomRouterPolicy; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; + +/** + * Policy manager which uses the {@link UniformRandomRouterPolicy} for the + * Router and {@link HomeAMRMProxyPolicy} as the AMRMProxy policy to find the + * RM. + */ +public class HomePolicyManager extends AbstractPolicyManager { + + /** Imaginary configuration to fulfill the super class. */ + private WeightedPolicyInfo weightedPolicyInfo; + + public HomePolicyManager() { + + weightedPolicyInfo = new WeightedPolicyInfo(); + weightedPolicyInfo.setRouterPolicyWeights( + Collections.singletonMap(new SubClusterIdInfo(""), 1.0f)); + weightedPolicyInfo.setAMRMPolicyWeights( + Collections.singletonMap(new SubClusterIdInfo(""), 1.0f)); + + // Hard-codes two compatible policies for Router and AMRMProxy. + routerFederationPolicy = UniformRandomRouterPolicy.class; + amrmProxyFederationPolicy = HomeAMRMProxyPolicy.class; + } + + @Override + public SubClusterPolicyConfiguration serializeConf() + throws FederationPolicyInitializationException { + + ByteBuffer buf = weightedPolicyInfo.toByteBuffer(); + return SubClusterPolicyConfiguration.newInstance( + getQueue(), this.getClass().getCanonicalName(), buf); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestHomeAMRMProxyPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestHomeAMRMProxyPolicy.java new file mode 100644 index 0000000000..90a6aeb595 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestHomeAMRMProxyPolicy.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.federation.policies.amrmproxy; + +import static org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil.createResourceRequests; +import static org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil.initializePolicyContext; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.policies.BaseFederationPoliciesTest; +import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState; +import org.junit.Before; +import org.junit.Test; + +/** + * Simple test class for the {@link HomeAMRMProxyPolicy}. + */ +public class TestHomeAMRMProxyPolicy extends BaseFederationPoliciesTest { + + private static final int NUM_SUBCLUSTERS = 4; + + private static final String HOME_SC_NAME = "sc2"; + private static final SubClusterId HOME_SC_ID = + SubClusterId.newInstance(HOME_SC_NAME); + + @Before + public void setUp() throws Exception { + setPolicy(new HomeAMRMProxyPolicy()); + // needed for base test to work + setPolicyInfo(mock(WeightedPolicyInfo.class)); + + for (int i = 0; i < NUM_SUBCLUSTERS; i++) { + SubClusterIdInfo sc = new SubClusterIdInfo("sc" + i); + SubClusterInfo sci = mock(SubClusterInfo.class); + when(sci.getState()).thenReturn(SubClusterState.SC_RUNNING); + when(sci.getSubClusterId()).thenReturn(sc.toId()); + getActiveSubclusters().put(sc.toId(), sci); + } + + initializePolicyContext(getPolicy(), mock(WeightedPolicyInfo.class), + getActiveSubclusters(), HOME_SC_NAME); + } + + @Test + public void testSplitAllocateRequest() throws YarnException { + + // Verify the request only goes to the home subcluster + String[] hosts = new String[] {"host0", "host1", "host2", "host3"}; + List resourceRequests = createResourceRequests( + hosts, 2 * 1024, 2, 1, 3, null, false); + + HomeAMRMProxyPolicy federationPolicy = + (HomeAMRMProxyPolicy)getPolicy(); + Map> response = + federationPolicy.splitResourceRequests(resourceRequests); + assertEquals(1, response.size()); + assertNotNull(response.get(HOME_SC_ID)); + assertEquals(9, response.get(HOME_SC_ID).size()); + } + + @Test + public void testHomeSubclusterNotActive() throws YarnException { + + // We setup the home subcluster to a non-existing one + initializePolicyContext(getPolicy(), mock(WeightedPolicyInfo.class), + getActiveSubclusters(), "badsc"); + + // Verify the request fails because the home subcluster is not available + try { + String[] hosts = new String[] {"host0", "host1", "host2", "host3"}; + List resourceRequests = createResourceRequests( + hosts, 2 * 1024, 2, 1, 3, null, false); + HomeAMRMProxyPolicy federationPolicy = (HomeAMRMProxyPolicy)getPolicy(); + federationPolicy.splitResourceRequests(resourceRequests); + fail("It should fail when the home subcluster is not active"); + } catch(FederationPolicyException e) { + GenericTestUtils.assertExceptionContains("is not active", e); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/manager/TestHomePolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/manager/TestHomePolicyManager.java new file mode 100644 index 0000000000..65e03217b9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/manager/TestHomePolicyManager.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.federation.policies.manager; + +import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.HomeAMRMProxyPolicy; +import org.apache.hadoop.yarn.server.federation.policies.router.UniformRandomRouterPolicy; +import org.junit.Before; + +/** + * Simple test of {@link HomePolicyManager}. + */ +public class TestHomePolicyManager extends BasePolicyManagerTest { + + @Before + public void setup() { + + wfp = new HomePolicyManager(); + + //set expected params that the base test class will use for tests + expectedPolicyManager = HomePolicyManager.class; + expectedAMRMProxyPolicy = HomeAMRMProxyPolicy.class; + expectedRouterPolicy = UniformRandomRouterPolicy.class; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationPoliciesTestUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationPoliciesTestUtil.java index acc14dd9f7..24399cb722 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationPoliciesTestUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationPoliciesTestUtil.java @@ -140,11 +140,21 @@ public static void initializePolicyContext( public static void initializePolicyContext( ConfigurableFederationPolicy policy, - WeightedPolicyInfo policyInfo, Map activeSubclusters) throws YarnException { + WeightedPolicyInfo policyInfo, + Map activeSubclusters) + throws YarnException { + initializePolicyContext( + policy, policyInfo, activeSubclusters, "homesubcluster"); + } + + public static void initializePolicyContext( + ConfigurableFederationPolicy policy, + WeightedPolicyInfo policyInfo, + Map activeSubclusters, + String subclusterId) throws YarnException { FederationPolicyInitializationContext context = new FederationPolicyInitializationContext(null, initResolver(), - initFacade(), SubClusterId.newInstance("homesubcluster")); + initFacade(), SubClusterId.newInstance(subclusterId)); initializePolicyContext(context, policy, policyInfo, activeSubclusters); }