YARN-11358. [Federation] Add FederationInterceptor#allow-partial-result config. (#5056)

This commit is contained in:
slfan1989 2022-12-15 06:37:56 +08:00 committed by GitHub
parent aaf92fe183
commit 6172c3192d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 108 additions and 7 deletions

View File

@ -4294,6 +4294,11 @@ public static boolean isAclEnabled(Configuration conf) {
ROUTER_PREFIX + "webapp.cross-origin.enabled"; ROUTER_PREFIX + "webapp.cross-origin.enabled";
public static final boolean DEFAULT_ROUTER_WEBAPP_ENABLE_CORS_FILTER = false; public static final boolean DEFAULT_ROUTER_WEBAPP_ENABLE_CORS_FILTER = false;
/** Router Interceptor Allow Partial Result Enable. **/
public static final String ROUTER_INTERCEPTOR_ALLOW_PARTIAL_RESULT_ENABLED =
ROUTER_PREFIX + "interceptor.allow-partial-result.enable";
public static final boolean DEFAULT_ROUTER_INTERCEPTOR_ALLOW_PARTIAL_RESULT_ENABLED = false;
//////////////////////////////// ////////////////////////////////
// CSI Volume configs // CSI Volume configs
//////////////////////////////// ////////////////////////////////

View File

@ -5085,4 +5085,18 @@
</description> </description>
</property> </property>
<property>
<name>yarn.router.interceptor.allow-partial-result.enable</name>
<value>false</value>
<description>
This configuration represents whether to allow the interceptor to
return partial SubCluster results.
If true, we will ignore the exception to some subClusters during the calling process,
and return result.
If false, if an exception occurs in a subCluster during the calling process,
an exception will be thrown directly.
Default is false.
</description>
</property>
</configuration> </configuration>

View File

@ -140,6 +140,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
private boolean returnPartialReport; private boolean returnPartialReport;
private boolean appInfosCacheEnabled; private boolean appInfosCacheEnabled;
private int appInfosCacheCount; private int appInfosCacheCount;
private boolean allowPartialResult;
private long submitIntervalTime; private long submitIntervalTime;
private Map<SubClusterId, DefaultRequestInterceptorREST> interceptors; private Map<SubClusterId, DefaultRequestInterceptorREST> interceptors;
@ -194,6 +195,10 @@ public void init(String user) {
appInfosCaches = new LRUCacheHashMap<>(appInfosCacheCount, true); appInfosCaches = new LRUCacheHashMap<>(appInfosCacheCount, true);
} }
allowPartialResult = conf.getBoolean(
YarnConfiguration.ROUTER_INTERCEPTOR_ALLOW_PARTIAL_RESULT_ENABLED,
YarnConfiguration.DEFAULT_ROUTER_INTERCEPTOR_ALLOW_PARTIAL_RESULT_ENABLED);
submitIntervalTime = conf.getTimeDuration( submitIntervalTime = conf.getTimeDuration(
YarnConfiguration.ROUTER_CLIENTRM_SUBMIT_INTERVAL_TIME, YarnConfiguration.ROUTER_CLIENTRM_SUBMIT_INTERVAL_TIME,
YarnConfiguration.DEFAULT_CLIENTRM_SUBMIT_INTERVAL_TIME, TimeUnit.MILLISECONDS); YarnConfiguration.DEFAULT_CLIENTRM_SUBMIT_INTERVAL_TIME, TimeUnit.MILLISECONDS);
@ -975,10 +980,13 @@ public NodesInfo getNodes(String states) {
}); });
} catch (NotFoundException e) { } catch (NotFoundException e) {
LOG.error("get all active sub cluster(s) error.", e); LOG.error("get all active sub cluster(s) error.", e);
throw e;
} catch (YarnException e) { } catch (YarnException e) {
LOG.error("getNodes by states = {} error.", states, e); LOG.error("getNodes by states = {} error.", states, e);
throw new YarnRuntimeException(e);
} catch (IOException e) { } catch (IOException e) {
LOG.error("getNodes by states = {} error with io error.", states, e); LOG.error("getNodes by states = {} error with io error.", states, e);
throw new YarnRuntimeException(e);
} }
// Delete duplicate from all the node reports got from all the available // Delete duplicate from all the node reports got from all the available
@ -2070,9 +2078,10 @@ private <R> Map<SubClusterInfo, R> invokeConcurrent(Collection<SubClusterInfo> c
Map<SubClusterInfo, R> results = new HashMap<>(); Map<SubClusterInfo, R> results = new HashMap<>();
// Send the requests in parallel // If there is a sub-cluster access error,
CompletionService<Pair<R, Exception>> compSvc = // we should choose whether to throw exception information according to user configuration.
new ExecutorCompletionService<>(this.threadpool); // Send the requests in parallel.
CompletionService<Pair<R, Exception>> compSvc = new ExecutorCompletionService<>(threadpool);
// This part of the code should be able to expose the accessed Exception information. // This part of the code should be able to expose the accessed Exception information.
// We use Pair to store related information. The left value of the Pair is the response, // We use Pair to store related information. The left value of the Pair is the response,
@ -2105,9 +2114,10 @@ private <R> Map<SubClusterInfo, R> invokeConcurrent(Collection<SubClusterInfo> c
if (response != null) { if (response != null) {
results.put(clusterId, response); results.put(clusterId, response);
} }
Exception exception = pair.getValue();
Exception exception = pair.getRight(); // If allowPartialResult=false, it means that if an exception occurs in a subCluster,
if (exception != null) { // an exception will be thrown directly.
if (!allowPartialResult && exception != null) {
throw exception; throw exception;
} }
} catch (Throwable e) { } catch (Throwable e) {
@ -2178,4 +2188,9 @@ private SubClusterInfo getHomeSubClusterInfoByReservationId(String resId)
public LRUCacheHashMap<RouterAppInfoCacheKey, AppsInfo> getAppInfosCaches() { public LRUCacheHashMap<RouterAppInfoCacheKey, AppsInfo> getAppInfosCaches() {
return appInfosCaches; return appInfosCaches;
} }
@VisibleForTesting
public void setAllowPartialResult(boolean allowPartialResult) {
this.allowPartialResult = allowPartialResult;
}
} }

View File

@ -749,6 +749,7 @@ public void testGetLabelsOnNode() throws Exception {
Assert.assertTrue(nodeLabelsName.contains("y")); Assert.assertTrue(nodeLabelsName.contains("y"));
// null request // null request
interceptor.setAllowPartialResult(false);
NodeLabelsInfo nodeLabelsInfo2 = interceptor.getLabelsOnNode(null, "node2"); NodeLabelsInfo nodeLabelsInfo2 = interceptor.getLabelsOnNode(null, "node2");
Assert.assertNotNull(nodeLabelsInfo2); Assert.assertNotNull(nodeLabelsInfo2);
Assert.assertEquals(0, nodeLabelsInfo2.getNodeLabelsName().size()); Assert.assertEquals(0, nodeLabelsInfo2.getNodeLabelsName().size());
@ -1183,6 +1184,8 @@ public void testWebAddressWithScheme() {
@Test @Test
public void testCheckUserAccessToQueue() throws Exception { public void testCheckUserAccessToQueue() throws Exception {
interceptor.setAllowPartialResult(false);
// Case 1: Only queue admin user can access other user's information // Case 1: Only queue admin user can access other user's information
HttpServletRequest mockHsr = mockHttpServletRequestByUserName("non-admin"); HttpServletRequest mockHsr = mockHttpServletRequestByUserName("non-admin");
String errorMsg1 = "User=non-admin doesn't haven access to queue=queue " + String errorMsg1 = "User=non-admin doesn't haven access to queue=queue " +
@ -1212,6 +1215,8 @@ public void testCheckUserAccessToQueue() throws Exception {
// Case 5: get OK only for SUBMIT_APP acl for "yarn" user // Case 5: get OK only for SUBMIT_APP acl for "yarn" user
checkUserAccessToQueueFailed("queue", "yarn", QueueACL.ADMINISTER_QUEUE, "admin"); checkUserAccessToQueueFailed("queue", "yarn", QueueACL.ADMINISTER_QUEUE, "admin");
checkUserAccessToQueueSuccess("queue", "yarn", QueueACL.SUBMIT_APPLICATIONS, "admin"); checkUserAccessToQueueSuccess("queue", "yarn", QueueACL.SUBMIT_APPLICATIONS, "admin");
interceptor.setAllowPartialResult(true);
} }
private void checkUserAccessToQueueSuccess(String queue, String userName, private void checkUserAccessToQueueSuccess(String queue, String userName,

View File

@ -25,6 +25,7 @@
import javax.ws.rs.core.Response; import javax.ws.rs.core.Response;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.test.LambdaTestUtils; import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -42,6 +43,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewApplication; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewApplication;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo;
import org.apache.hadoop.yarn.server.router.clientrm.PassThroughClientRequestInterceptor; import org.apache.hadoop.yarn.server.router.clientrm.PassThroughClientRequestInterceptor;
import org.apache.hadoop.yarn.server.router.clientrm.TestableFederationClientInterceptor; import org.apache.hadoop.yarn.server.router.clientrm.TestableFederationClientInterceptor;
import org.apache.hadoop.yarn.webapp.NotFoundException; import org.apache.hadoop.yarn.webapp.NotFoundException;
@ -81,10 +83,16 @@ public class TestFederationInterceptorRESTRetry
@Override @Override
public void setUp() { public void setUp() {
super.setUpConfig(); super.setUpConfig();
Configuration conf = this.getConf();
// Compatible with historical test cases, we set router.allow-partial-result.enable=false.
conf.setBoolean(YarnConfiguration.ROUTER_INTERCEPTOR_ALLOW_PARTIAL_RESULT_ENABLED, false);
interceptor = new TestableFederationInterceptorREST(); interceptor = new TestableFederationInterceptorREST();
stateStore = new MemoryFederationStateStore(); stateStore = new MemoryFederationStateStore();
stateStore.init(this.getConf()); stateStore.init(conf);
FederationStateStoreFacade.getInstance().reinitialize(stateStore, FederationStateStoreFacade.getInstance().reinitialize(stateStore,
getConf()); getConf());
stateStoreUtil = new FederationStateStoreTestUtil(stateStore); stateStoreUtil = new FederationStateStoreTestUtil(stateStore);
@ -516,4 +524,58 @@ private void checkEmptyMetrics(ClusterMetricsInfo response) {
Assert.assertEquals(0, response.getActiveNodes()); Assert.assertEquals(0, response.getActiveNodes());
Assert.assertEquals(0, response.getShutdownNodes()); Assert.assertEquals(0, response.getShutdownNodes());
} }
@Test
public void testGetNodesOneBadSCAllowPartial() throws Exception {
// We set allowPartialResult to true.
// In this test case, we set up a subCluster,
// and the subCluster status is bad, we can't get the response,
// an exception should be thrown at this time.
interceptor.setAllowPartialResult(true);
setupCluster(Arrays.asList(bad2));
NodesInfo nodesInfo = interceptor.getNodes(null);
Assert.assertNotNull(nodesInfo);
// We need to set allowPartialResult=false
interceptor.setAllowPartialResult(false);
}
@Test
public void testGetNodesTwoBadSCsAllowPartial() throws Exception {
// We set allowPartialResult to true.
// In this test case, we set up 2 subClusters,
// and the status of these 2 subClusters is bad. When we call the interface,
// an exception should be returned.
interceptor.setAllowPartialResult(true);
setupCluster(Arrays.asList(bad1, bad2));
NodesInfo nodesInfo = interceptor.getNodes(null);
Assert.assertNotNull(nodesInfo);
// We need to set allowPartialResult=false
interceptor.setAllowPartialResult(false);
}
@Test
public void testGetNodesOneBadOneGoodAllowPartial() throws Exception {
// allowPartialResult = true,
// We tolerate exceptions and return normal results
interceptor.setAllowPartialResult(true);
setupCluster(Arrays.asList(good, bad2));
NodesInfo response = interceptor.getNodes(null);
Assert.assertNotNull(response);
Assert.assertEquals(1, response.getNodes().size());
// Check if the only node came from Good SubCluster
Assert.assertEquals(good.getId(),
Long.toString(response.getNodes().get(0).getLastHealthUpdate()));
// allowPartialResult = false,
// We do not tolerate exceptions and will throw exceptions directly
interceptor.setAllowPartialResult(false);
setupCluster(Arrays.asList(good, bad2));
}
} }