From e994635a9580ca99c1f3408a6a6a98360c3c17ec Mon Sep 17 00:00:00 2001 From: slfan1989 <55643692+slfan1989@users.noreply.github.com> Date: Fri, 29 Jul 2022 02:53:04 +0800 Subject: [PATCH] YARN-11212. [Federation] Add getNodeToLabels REST APIs for Router. (#4614) --- .../webapp/dao/NodeLabelsInfo.java | 18 ++- .../webapp/dao/NodeToLabelsInfo.java | 10 ++ .../webapp/FederationInterceptorREST.java | 104 ++++++++---------- .../router/webapp/RouterWebServiceUtil.java | 39 ++++++- .../MockDefaultRequestInterceptorREST.java | 18 +++ .../webapp/TestFederationInterceptorREST.java | 21 ++++ 6 files changed, 141 insertions(+), 69 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/NodeLabelsInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/NodeLabelsInfo.java index 2c3a8a507a..c9809b6d2e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/NodeLabelsInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/NodeLabelsInfo.java @@ -32,8 +32,7 @@ public class NodeLabelsInfo { @XmlElement(name = "nodeLabelInfo") - private ArrayList nodeLabelsInfo = - new ArrayList(); + private ArrayList nodeLabelsInfo = new ArrayList<>(); public NodeLabelsInfo() { // JAXB needs this @@ -44,25 +43,32 @@ public NodeLabelsInfo(ArrayList nodeLabels) { } public NodeLabelsInfo(List nodeLabels) { - this.nodeLabelsInfo = new ArrayList(); + this.nodeLabelsInfo = new ArrayList<>(); for (NodeLabel label : nodeLabels) { this.nodeLabelsInfo.add(new NodeLabelInfo(label)); } } public NodeLabelsInfo(Set nodeLabelsName) { - this.nodeLabelsInfo = new ArrayList(); + this.nodeLabelsInfo = new ArrayList<>(); for (String labelName : nodeLabelsName) { this.nodeLabelsInfo.add(new NodeLabelInfo(labelName)); } } + public NodeLabelsInfo(Collection nodeLabels) { + this.nodeLabelsInfo = new ArrayList<>(); + nodeLabels.stream().forEach(nodeLabel -> { + this.nodeLabelsInfo.add(new NodeLabelInfo(nodeLabel)); + }); + } + public ArrayList getNodeLabelsInfo() { return nodeLabelsInfo; } public Set getNodeLabels() { - Set nodeLabels = new HashSet(); + Set nodeLabels = new HashSet<>(); for (NodeLabelInfo label : nodeLabelsInfo) { nodeLabels.add(NodeLabel.newInstance(label.getName(), label.getExclusivity())); @@ -71,7 +77,7 @@ public Set getNodeLabels() { } public List getNodeLabelsName() { - ArrayList nodeLabelsName = new ArrayList(); + ArrayList nodeLabelsName = new ArrayList<>(); for (NodeLabelInfo label : nodeLabelsInfo) { nodeLabelsName.add(label.getName()); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/NodeToLabelsInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/NodeToLabelsInfo.java index 0b6e4bc868..e9044b9397 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/NodeToLabelsInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/NodeToLabelsInfo.java @@ -35,7 +35,17 @@ public NodeToLabelsInfo() { // JAXB needs this } + public NodeToLabelsInfo(HashMap nodeToLabels) { + if (nodeToLabels != null) { + this.nodeToLabels.putAll(nodeToLabels); + } + } + public HashMap getNodeToLabels() { return nodeToLabels; } + + public void setNodeToLabels(HashMap nodeToLabels) { + this.nodeToLabels = nodeToLabels; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java index 690715c9b3..a1094de749 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java @@ -282,7 +282,7 @@ public Response createNewApplication(HttpServletRequest hsr) .entity(e.getLocalizedMessage()).build(); } - List blacklist = new ArrayList(); + List blacklist = new ArrayList<>(); for (int i = 0; i < numSubmitRetries; ++i) { @@ -295,7 +295,7 @@ public Response createNewApplication(HttpServletRequest hsr) .entity(e.getLocalizedMessage()).build(); } - LOG.debug("getNewApplication try #{} on SubCluster {}", i, subClusterId); + LOG.debug("getNewApplication try #{} on SubCluster {}.", i, subClusterId); DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(subClusterId, @@ -304,7 +304,7 @@ public Response createNewApplication(HttpServletRequest hsr) try { response = interceptor.createNewApplication(hsr); } catch (Exception e) { - LOG.warn("Unable to create a new ApplicationId in SubCluster {}", + LOG.warn("Unable to create a new ApplicationId in SubCluster {}.", subClusterId.getId(), e); } @@ -424,7 +424,7 @@ public Response submitApplication(ApplicationSubmissionContextInfo newApp, .build(); } - List blacklist = new ArrayList(); + List blacklist = new ArrayList<>(); for (int i = 0; i < numSubmitRetries; ++i) { @@ -441,7 +441,7 @@ public Response submitApplication(ApplicationSubmissionContextInfo newApp, .entity(e.getLocalizedMessage()) .build(); } - LOG.info("submitApplication appId {} try #{} on SubCluster {}", + LOG.info("submitApplication appId {} try #{} on SubCluster {}.", applicationId, i, subClusterId); ApplicationHomeSubCluster appHomeSubCluster = @@ -482,7 +482,7 @@ public Response submitApplication(ApplicationSubmissionContextInfo newApp, .build(); } if (subClusterId == subClusterIdInStateStore) { - LOG.info("Application {} already submitted on SubCluster {}", + LOG.info("Application {} already submitted on SubCluster {}.", applicationId, subClusterId); } else { routerMetrics.incrAppsFailedSubmitted(); @@ -712,8 +712,7 @@ public AppsInfo call() { if (rmApps == null) { routerMetrics.incrMultipleAppsFailedRetrieved(); - LOG.error("Subcluster {} failed to return appReport.", - info.getSubClusterId()); + LOG.error("Subcluster {} failed to return appReport.", info.getSubClusterId()); return null; } return rmApps; @@ -873,8 +872,7 @@ private Map getNode( subclusterId, subcluster.getRMWebServiceAddress()); return interceptor.getNode(nodeId); } catch (Exception e) { - LOG.error("Subcluster {} failed to return nodeInfo.", - subclusterId); + LOG.error("Subcluster {} failed to return nodeInfo.", subclusterId, e); return null; } }); @@ -953,58 +951,28 @@ private SubClusterInfo getNodeSubcluster(String nodeId) public NodesInfo getNodes(String states) { NodesInfo nodes = new NodesInfo(); - - final Map subClustersActive; try { - subClustersActive = getActiveSubclusters(); - } catch (Exception e) { - LOG.error("Cannot get nodes: {}", e.getMessage()); - return new NodesInfo(); - } - - // Send the requests in parallel - CompletionService compSvc = - new ExecutorCompletionService(this.threadpool); - - for (final SubClusterInfo info : subClustersActive.values()) { - compSvc.submit(new Callable() { - @Override - public NodesInfo call() { - DefaultRequestInterceptorREST interceptor = - getOrCreateInterceptorForSubCluster( - info.getSubClusterId(), info.getRMWebServiceAddress()); - try { - NodesInfo nodesInfo = interceptor.getNodes(states); - return nodesInfo; - } catch (Exception e) { - LOG.error("Subcluster {} failed to return nodesInfo.", - info.getSubClusterId()); - return null; - } - } + Map subClustersActive = getActiveSubclusters(); + Class[] argsClasses = new Class[]{String.class}; + Object[] args = new Object[]{states}; + ClientMethod remoteMethod = new ClientMethod("getNodes", argsClasses, args); + Map nodesMap = + invokeConcurrent(subClustersActive.values(), remoteMethod, NodesInfo.class); + nodesMap.values().stream().forEach(nodesInfo -> { + nodes.addAll(nodesInfo.getNodes()); }); - } - - // Collect all the responses in parallel - - for (int i = 0; i < subClustersActive.size(); i++) { - try { - Future future = compSvc.take(); - NodesInfo nodesResponse = future.get(); - - if (nodesResponse != null) { - nodes.addAll(nodesResponse.getNodes()); - } - } catch (Throwable e) { - LOG.warn("Failed to get nodes report ", e); - } + } catch (NotFoundException e) { + LOG.error("Get all active sub cluster(s) error.", e); + } catch (YarnException e) { + LOG.error("getNodes error.", e); + } catch (IOException e) { + LOG.error("getNodes error with io error.", e); } // Delete duplicate from all the node reports got from all the available // YARN RMs. Nodes can be moved from one subclusters to another. In this // operation they result LOST/RUNNING in the previous SubCluster and // NEW/RUNNING in the new one. - return RouterWebServiceUtil.deleteDuplicateNodesInfo(nodes.getNodes()); } @@ -1172,7 +1140,22 @@ public ApplicationStatisticsInfo getAppStatistics(HttpServletRequest hsr, @Override public NodeToLabelsInfo getNodeToLabels(HttpServletRequest hsr) throws IOException { - throw new NotImplementedException("Code is not implemented"); + try { + Map subClustersActive = getActiveSubclusters(); + final HttpServletRequest hsrCopy = clone(hsr); + Class[] argsClasses = new Class[]{HttpServletRequest.class}; + Object[] args = new Object[]{hsrCopy}; + ClientMethod remoteMethod = new ClientMethod("getNodeToLabels", argsClasses, args); + Map nodeToLabelsInfoMap = + invokeConcurrent(subClustersActive.values(), remoteMethod, NodeToLabelsInfo.class); + return RouterWebServiceUtil.mergeNodeToLabels(nodeToLabelsInfoMap); + } catch (NotFoundException e) { + LOG.error("Get all active sub cluster(s) error.", e); + throw new IOException("Get all active sub cluster(s) error.", e); + } catch (YarnException e) { + LOG.error("getNodeToLabels error.", e); + throw new IOException("getNodeToLabels error.", e); + } } @Override @@ -1395,7 +1378,7 @@ public void shutdown() { } private Map invokeConcurrent(Collection clusterIds, - ClientMethod request, Class clazz) { + ClientMethod request, Class clazz) throws YarnException { Map results = new HashMap<>(); @@ -1413,8 +1396,8 @@ private Map invokeConcurrent(Collection c R ret = clazz.cast(retObj); return ret; } catch (Exception e) { - LOG.error("SubCluster {} failed to call {} method.", info.getSubClusterId(), - request.getMethodName(), e); + LOG.error("SubCluster %s failed to call %s method.", + info.getSubClusterId(), request.getMethodName(), e); return null; } }); @@ -1428,7 +1411,10 @@ private Map invokeConcurrent(Collection c results.put(clusterId, response); } } catch (Throwable e) { - LOG.warn("SubCluster {} failed to {} report.", clusterId, request.getMethodName(), e); + String msg = String.format("SubCluster %s failed to %s report.", + clusterId, request.getMethodName()); + LOG.warn(msg, e); + throw new YarnRuntimeException(msg, e); } }); return results; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServiceUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServiceUtil.java index 7f59506923..336e772bef 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServiceUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServiceUtil.java @@ -30,6 +30,9 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Collection; +import java.util.Set; +import java.util.HashSet; import java.util.concurrent.TimeUnit; import javax.servlet.http.HttpServletRequest; @@ -43,13 +46,17 @@ import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebAppUtil; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelsInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsInfo; import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager; import org.apache.hadoop.yarn.webapp.BadRequestException; import org.apache.hadoop.yarn.webapp.ForbiddenException; @@ -293,8 +300,8 @@ public static AppsInfo mergeAppsInfo(ArrayList appsInfo, boolean returnPartialResult) { AppsInfo allApps = new AppsInfo(); - Map federationAM = new HashMap(); - Map federationUAMSum = new HashMap(); + Map federationAM = new HashMap<>(); + Map federationUAMSum = new HashMap<>(); for (AppInfo a : appsInfo) { // Check if this AppInfo is an AM if (a.getAMHostHttpAddress() != null) { @@ -332,7 +339,7 @@ public static AppsInfo mergeAppsInfo(ArrayList appsInfo, } } - allApps.addAll(new ArrayList(federationAM.values())); + allApps.addAll(new ArrayList<>(federationAM.values())); return allApps; } @@ -419,7 +426,7 @@ public static NodesInfo deleteDuplicateNodesInfo(ArrayList nodes) { nodesMap.put(node.getNodeId(), node); } } - nodesInfo.addAll(new ArrayList(nodesMap.values())); + nodesInfo.addAll(new ArrayList<>(nodesMap.values())); return nodesInfo; } @@ -509,4 +516,28 @@ protected static String getMediaTypeFromHttpServletRequest( return header; } + public static NodeToLabelsInfo mergeNodeToLabels( + Map nodeToLabelsInfoMap) { + + HashMap nodeToLabels = new HashMap<>(); + Collection nodeToLabelsInfos = nodeToLabelsInfoMap.values(); + + nodeToLabelsInfos.stream().forEach(nodeToLabelsInfo -> { + for (Map.Entry item : nodeToLabelsInfo.getNodeToLabels().entrySet()) { + String key = item.getKey(); + NodeLabelsInfo itemValue = item.getValue(); + NodeLabelsInfo nodeToLabelsValue = nodeToLabels.getOrDefault(item.getKey(), null); + Set hashSet = new HashSet<>(); + if (itemValue != null) { + hashSet.addAll(itemValue.getNodeLabels()); + } + if (nodeToLabelsValue != null) { + hashSet.addAll(nodeToLabelsValue.getNodeLabels()); + } + nodeToLabels.put(key, new NodeLabelsInfo(hashSet)); + } + }); + + return new NodeToLabelsInfo(nodeToLabels); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java index d41a0aee58..2622dfc44a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java @@ -22,6 +22,8 @@ import java.net.ConnectException; import java.util.HashSet; import java.util.Set; +import java.util.HashMap; +import java.util.Collections; import java.util.concurrent.atomic.AtomicInteger; import javax.servlet.http.HttpServletRequest; @@ -52,6 +54,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceOptionInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelsInfo; import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo; import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo; import org.apache.hadoop.yarn.webapp.NotFoundException; @@ -279,4 +283,18 @@ public ContainersInfo getContainers(HttpServletRequest req, HttpServletResponse return containers; } + + @Override + public NodeToLabelsInfo getNodeToLabels(HttpServletRequest hsr) throws IOException { + if (!isRunning) { + throw new RuntimeException("RM is stopped"); + } + NodeLabelsInfo cpuNode = new NodeLabelsInfo(Collections.singleton("CPU")); + NodeLabelsInfo gpuNode = new NodeLabelsInfo(Collections.singleton("GPU")); + + HashMap nodeLabels = new HashMap<>(); + nodeLabels.put("node1", cpuNode); + nodeLabels.put("node2", gpuNode); + return new NodeToLabelsInfo(nodeLabels); + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java index 03215e4f87..959001182f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.HashMap; import javax.ws.rs.core.Response; @@ -49,6 +50,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceOptionInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelsInfo; import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo; import org.apache.hadoop.yarn.util.MonotonicClock; import org.junit.Assert; @@ -605,4 +608,22 @@ public void testGetContainersWrongFormat() { Assert.assertTrue(response.getContainers().isEmpty()); } + + @Test + public void testGetNodeToLabels() throws IOException { + NodeToLabelsInfo info = interceptor.getNodeToLabels(null); + HashMap map = info.getNodeToLabels(); + Assert.assertNotNull(map); + Assert.assertEquals(2, map.size()); + + NodeLabelsInfo node1Value = map.getOrDefault("node1", null); + Assert.assertNotNull(node1Value); + Assert.assertEquals(1, node1Value.getNodeLabelsName().size()); + Assert.assertEquals("CPU", node1Value.getNodeLabelsName().get(0)); + + NodeLabelsInfo node2Value = map.getOrDefault("node2", null); + Assert.assertNotNull(node2Value); + Assert.assertEquals(1, node2Value.getNodeLabelsName().size()); + Assert.assertEquals("GPU", node2Value.getNodeLabelsName().get(0)); + } }