YARN-996. REST API support for node resource configuration. Contributed by Inigo Goiri.
This commit is contained in:
parent
1ce2e91c4b
commit
7536488bbd
@ -60,7 +60,8 @@ public class JAXBContextResolver implements ContextResolver<JAXBContext> {
|
|||||||
final Class[] rootUnwrappedTypes =
|
final Class[] rootUnwrappedTypes =
|
||||||
{ NewApplication.class, ApplicationSubmissionContextInfo.class,
|
{ NewApplication.class, ApplicationSubmissionContextInfo.class,
|
||||||
ContainerLaunchContextInfo.class, LocalResourceInfo.class,
|
ContainerLaunchContextInfo.class, LocalResourceInfo.class,
|
||||||
DelegationToken.class, AppQueue.class, AppPriority.class };
|
DelegationToken.class, AppQueue.class, AppPriority.class,
|
||||||
|
ResourceOptionInfo.class };
|
||||||
|
|
||||||
this.typesContextMap = new HashMap<Class, JAXBContext>();
|
this.typesContextMap = new HashMap<Class, JAXBContext>();
|
||||||
context =
|
context =
|
||||||
|
@ -57,6 +57,9 @@ public final class RMWSConsts {
|
|||||||
/** Path for {@code RMWebServiceProtocol#getNode}. */
|
/** Path for {@code RMWebServiceProtocol#getNode}. */
|
||||||
public static final String NODES_NODEID = "/nodes/{nodeId}";
|
public static final String NODES_NODEID = "/nodes/{nodeId}";
|
||||||
|
|
||||||
|
/** Path for {@code RMWebServiceProtocol#updateNodeResource}. */
|
||||||
|
public static final String NODE_RESOURCE = "/nodes/{nodeId}/resource";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Path for {@code RMWebServiceProtocol#getApps} and
|
* Path for {@code RMWebServiceProtocol#getApps} and
|
||||||
* {@code RMWebServiceProtocol#getApp}.
|
* {@code RMWebServiceProtocol#getApp}.
|
||||||
|
@ -58,6 +58,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.RMQueueAclInfo;
|
|||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDeleteRequestInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDeleteRequestInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationSubmissionRequestInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationSubmissionRequestInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateRequestInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateRequestInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceOptionInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -148,6 +150,19 @@ public interface RMWebServiceProtocol {
|
|||||||
*/
|
*/
|
||||||
NodeInfo getNode(String nodeId);
|
NodeInfo getNode(String nodeId);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This method changes the resources of a specific node, and it is reachable
|
||||||
|
* by using {@link RMWSConsts#NODE_RESOURCE}.
|
||||||
|
*
|
||||||
|
* @param hsr The servlet request.
|
||||||
|
* @param nodeId The node we want to retrieve the information for.
|
||||||
|
* It is a PathParam.
|
||||||
|
* @param resourceOption The resource change.
|
||||||
|
* @throws AuthorizationException If the user is not authorized.
|
||||||
|
*/
|
||||||
|
ResourceInfo updateNodeResource(HttpServletRequest hsr, String nodeId,
|
||||||
|
ResourceOptionInfo resourceOption) throws AuthorizationException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This method retrieves all the app reports in the cluster, and it is
|
* This method retrieves all the app reports in the cluster, and it is
|
||||||
* reachable by using {@link RMWSConsts#APPS}.
|
* reachable by using {@link RMWSConsts#APPS}.
|
||||||
|
@ -56,8 +56,6 @@ import javax.ws.rs.core.MediaType;
|
|||||||
import javax.ws.rs.core.Response;
|
import javax.ws.rs.core.Response;
|
||||||
import javax.ws.rs.core.Response.Status;
|
import javax.ws.rs.core.Response.Status;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||||
import org.apache.hadoop.http.JettyUtils;
|
import org.apache.hadoop.http.JettyUtils;
|
||||||
@ -118,6 +116,7 @@ import org.apache.hadoop.yarn.api.records.ReservationRequest;
|
|||||||
import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
|
import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
|
||||||
import org.apache.hadoop.yarn.api.records.ReservationRequests;
|
import org.apache.hadoop.yarn.api.records.ReservationRequests;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ResourceOption;
|
||||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
@ -125,8 +124,11 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
|||||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.AdminService;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeLabelsUtils;
|
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeLabelsUtils;
|
||||||
@ -185,6 +187,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationSubmi
|
|||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateRequestInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateRequestInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateResponseInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateResponseInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceOptionInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.StatisticsItemInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.StatisticsItemInfo;
|
||||||
@ -202,6 +205,8 @@ import org.apache.hadoop.yarn.webapp.ForbiddenException;
|
|||||||
import org.apache.hadoop.yarn.webapp.NotFoundException;
|
import org.apache.hadoop.yarn.webapp.NotFoundException;
|
||||||
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
|
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
|
||||||
import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
|
import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.inject.Inject;
|
import com.google.inject.Inject;
|
||||||
@ -211,8 +216,8 @@ import com.google.inject.Singleton;
|
|||||||
@Path(RMWSConsts.RM_WEB_SERVICE_PATH)
|
@Path(RMWSConsts.RM_WEB_SERVICE_PATH)
|
||||||
public class RMWebServices extends WebServices implements RMWebServiceProtocol {
|
public class RMWebServices extends WebServices implements RMWebServiceProtocol {
|
||||||
|
|
||||||
private static final Log LOG =
|
private static final Logger LOG =
|
||||||
LogFactory.getLog(RMWebServices.class.getName());
|
LoggerFactory.getLogger(RMWebServices.class.getName());
|
||||||
|
|
||||||
private final ResourceManager rm;
|
private final ResourceManager rm;
|
||||||
private static RecordFactory recordFactory =
|
private static RecordFactory recordFactory =
|
||||||
@ -482,6 +487,64 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol {
|
|||||||
return nodeInfo;
|
return nodeInfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@POST
|
||||||
|
@Path(RMWSConsts.NODE_RESOURCE)
|
||||||
|
@Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
|
||||||
|
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
|
||||||
|
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
|
||||||
|
public ResourceInfo updateNodeResource(
|
||||||
|
@Context HttpServletRequest hsr,
|
||||||
|
@PathParam(RMWSConsts.NODEID) String nodeId,
|
||||||
|
ResourceOptionInfo resourceOption) throws AuthorizationException {
|
||||||
|
|
||||||
|
UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
|
||||||
|
initForWritableEndpoints(callerUGI, false);
|
||||||
|
|
||||||
|
RMNode rmNode = getRMNode(nodeId);
|
||||||
|
Map<NodeId, ResourceOption> nodeResourceMap =
|
||||||
|
Collections.singletonMap(
|
||||||
|
rmNode.getNodeID(), resourceOption.getResourceOption());
|
||||||
|
UpdateNodeResourceRequest updateRequest =
|
||||||
|
UpdateNodeResourceRequest.newInstance(nodeResourceMap);
|
||||||
|
|
||||||
|
try {
|
||||||
|
RMContext rmContext = this.rm.getRMContext();
|
||||||
|
AdminService admin = rmContext.getRMAdminService();
|
||||||
|
admin.updateNodeResource(updateRequest);
|
||||||
|
} catch (YarnException e) {
|
||||||
|
String message = "Failed to update the node resource " +
|
||||||
|
rmNode.getNodeID() + ".";
|
||||||
|
LOG.error(message, e);
|
||||||
|
throw new YarnRuntimeException(message, e);
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.error("Failed to update the node resource {}.",
|
||||||
|
rmNode.getNodeID(), e);
|
||||||
|
}
|
||||||
|
|
||||||
|
return new ResourceInfo(rmNode.getTotalCapability());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the RMNode in the RM from the node identifier.
|
||||||
|
* @param nodeId Node identifier.
|
||||||
|
* @return The RMNode in the RM.
|
||||||
|
*/
|
||||||
|
private RMNode getRMNode(final String nodeId) {
|
||||||
|
if (nodeId == null || nodeId.isEmpty()) {
|
||||||
|
throw new NotFoundException("nodeId, " + nodeId + ", is empty or null");
|
||||||
|
}
|
||||||
|
NodeId nid = NodeId.fromString(nodeId);
|
||||||
|
RMContext rmContext = this.rm.getRMContext();
|
||||||
|
RMNode ni = rmContext.getRMNodes().get(nid);
|
||||||
|
if (ni == null) {
|
||||||
|
ni = rmContext.getInactiveRMNodes().get(nid);
|
||||||
|
if (ni == null) {
|
||||||
|
throw new NotFoundException("nodeId, " + nodeId + ", is not found");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ni;
|
||||||
|
}
|
||||||
|
|
||||||
@GET
|
@GET
|
||||||
@Path(RMWSConsts.APPS)
|
@Path(RMWSConsts.APPS)
|
||||||
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
|
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
|
||||||
|
@ -64,6 +64,7 @@ public class NodeInfo {
|
|||||||
protected ResourceInfo usedResource;
|
protected ResourceInfo usedResource;
|
||||||
protected ResourceInfo availableResource;
|
protected ResourceInfo availableResource;
|
||||||
protected NodeAttributesInfo nodeAttributesInfo;
|
protected NodeAttributesInfo nodeAttributesInfo;
|
||||||
|
private ResourceInfo totalResource;
|
||||||
|
|
||||||
public NodeInfo() {
|
public NodeInfo() {
|
||||||
} // JAXB needs this
|
} // JAXB needs this
|
||||||
@ -92,6 +93,7 @@ public class NodeInfo {
|
|||||||
this.lastHealthUpdate = ni.getLastHealthReportTime();
|
this.lastHealthUpdate = ni.getLastHealthReportTime();
|
||||||
this.healthReport = String.valueOf(ni.getHealthReport());
|
this.healthReport = String.valueOf(ni.getHealthReport());
|
||||||
this.version = ni.getNodeManagerVersion();
|
this.version = ni.getNodeManagerVersion();
|
||||||
|
this.totalResource = new ResourceInfo(ni.getTotalCapability());
|
||||||
|
|
||||||
// Status of opportunistic containers.
|
// Status of opportunistic containers.
|
||||||
this.numRunningOpportContainers = 0;
|
this.numRunningOpportContainers = 0;
|
||||||
@ -242,4 +244,11 @@ public class NodeInfo {
|
|||||||
this.lastHealthUpdate = lastHealthUpdate;
|
this.lastHealthUpdate = lastHealthUpdate;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setTotalResource(ResourceInfo total) {
|
||||||
|
this.totalResource = total;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ResourceInfo getTotalResource() {
|
||||||
|
return this.totalResource;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,65 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
|
||||||
|
|
||||||
|
import javax.xml.bind.annotation.XmlAccessType;
|
||||||
|
import javax.xml.bind.annotation.XmlAccessorType;
|
||||||
|
import javax.xml.bind.annotation.XmlElement;
|
||||||
|
import javax.xml.bind.annotation.XmlRootElement;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.records.ResourceOption;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A JAXB representation of a {link ResourceOption}.
|
||||||
|
*/
|
||||||
|
@XmlRootElement(name = "resourceOption")
|
||||||
|
@XmlAccessorType(XmlAccessType.NONE)
|
||||||
|
public class ResourceOptionInfo {
|
||||||
|
|
||||||
|
@XmlElement(name = "resource")
|
||||||
|
private ResourceInfo resource = new ResourceInfo();
|
||||||
|
@XmlElement(name = "overCommitTimeout")
|
||||||
|
private int overCommitTimeout;
|
||||||
|
|
||||||
|
/** Internal resource option for caching. */
|
||||||
|
private ResourceOption resourceOption;
|
||||||
|
|
||||||
|
|
||||||
|
public ResourceOptionInfo() {
|
||||||
|
} // JAXB needs this
|
||||||
|
|
||||||
|
public ResourceOptionInfo(ResourceOption resourceOption) {
|
||||||
|
if (resourceOption != null) {
|
||||||
|
this.resource = new ResourceInfo(resourceOption.getResource());
|
||||||
|
this.overCommitTimeout = resourceOption.getOverCommitTimeout();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public ResourceOption getResourceOption() {
|
||||||
|
if (resourceOption == null) {
|
||||||
|
resourceOption = ResourceOption.newInstance(
|
||||||
|
resource.getResource(), overCommitTimeout);
|
||||||
|
}
|
||||||
|
return resourceOption;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return getResourceOption().toString();
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,27 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This package contains the web data access objects (DAO) for the RM.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@InterfaceStability.Unstable
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
@ -25,28 +25,37 @@ import static org.junit.Assert.fail;
|
|||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import java.io.StringReader;
|
import java.io.StringReader;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
|
import java.util.Enumeration;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Properties;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.TreeMap;
|
import java.util.TreeMap;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
|
|
||||||
|
import javax.servlet.FilterConfig;
|
||||||
|
import javax.servlet.ServletException;
|
||||||
import javax.ws.rs.core.MediaType;
|
import javax.ws.rs.core.MediaType;
|
||||||
import javax.xml.parsers.DocumentBuilder;
|
import javax.xml.parsers.DocumentBuilder;
|
||||||
import javax.xml.parsers.DocumentBuilderFactory;
|
import javax.xml.parsers.DocumentBuilderFactory;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.http.JettyUtils;
|
import org.apache.hadoop.http.JettyUtils;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.security.authentication.server.AuthenticationFilter;
|
||||||
|
import org.apache.hadoop.security.authentication.server.PseudoAuthenticationHandler;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeAttribute;
|
import org.apache.hadoop.yarn.api.records.NodeAttribute;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeAttributeType;
|
import org.apache.hadoop.yarn.api.records.NodeAttributeType;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeState;
|
import org.apache.hadoop.yarn.api.records.NodeState;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ResourceOption;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
|
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||||
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
|
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
|
||||||
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
||||||
@ -66,6 +75,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
|
|||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceOptionInfo;
|
||||||
import org.apache.hadoop.yarn.util.Records;
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
import org.apache.hadoop.yarn.util.RackResolver;
|
import org.apache.hadoop.yarn.util.RackResolver;
|
||||||
import org.apache.hadoop.yarn.util.YarnVersionInfo;
|
import org.apache.hadoop.yarn.util.YarnVersionInfo;
|
||||||
@ -85,6 +97,7 @@ import org.xml.sax.InputSource;
|
|||||||
|
|
||||||
import com.google.common.base.Joiner;
|
import com.google.common.base.Joiner;
|
||||||
import com.google.inject.Guice;
|
import com.google.inject.Guice;
|
||||||
|
import com.google.inject.Singleton;
|
||||||
import com.google.inject.servlet.ServletModule;
|
import com.google.inject.servlet.ServletModule;
|
||||||
import com.sun.jersey.api.client.ClientResponse;
|
import com.sun.jersey.api.client.ClientResponse;
|
||||||
import com.sun.jersey.api.client.ClientResponse.Status;
|
import com.sun.jersey.api.client.ClientResponse.Status;
|
||||||
@ -96,22 +109,58 @@ import com.sun.jersey.test.framework.WebAppDescriptor;
|
|||||||
public class TestRMWebServicesNodes extends JerseyTestBase {
|
public class TestRMWebServicesNodes extends JerseyTestBase {
|
||||||
|
|
||||||
private static MockRM rm;
|
private static MockRM rm;
|
||||||
|
private static YarnConfiguration conf;
|
||||||
|
|
||||||
|
private static String userName;
|
||||||
|
|
||||||
private static class WebServletModule extends ServletModule {
|
private static class WebServletModule extends ServletModule {
|
||||||
@Override
|
@Override
|
||||||
protected void configureServlets() {
|
protected void configureServlets() {
|
||||||
bind(JAXBContextResolver.class);
|
bind(JAXBContextResolver.class);
|
||||||
|
try {
|
||||||
|
userName = UserGroupInformation.getCurrentUser().getShortUserName();
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
throw new RuntimeException("Unable to get current user name "
|
||||||
|
+ ioe.getMessage(), ioe);
|
||||||
|
}
|
||||||
|
conf = new YarnConfiguration();
|
||||||
|
conf.set(YarnConfiguration.YARN_ADMIN_ACL, userName);
|
||||||
bind(RMWebServices.class);
|
bind(RMWebServices.class);
|
||||||
bind(GenericExceptionHandler.class);
|
bind(GenericExceptionHandler.class);
|
||||||
rm = new MockRM(new Configuration());
|
rm = new MockRM(conf);
|
||||||
rm.getRMContext().getContainerTokenSecretManager().rollMasterKey();
|
rm.getRMContext().getContainerTokenSecretManager().rollMasterKey();
|
||||||
rm.getRMContext().getNMTokenSecretManager().rollMasterKey();
|
rm.getRMContext().getNMTokenSecretManager().rollMasterKey();
|
||||||
rm.disableDrainEventsImplicitly();
|
rm.disableDrainEventsImplicitly();
|
||||||
bind(ResourceManager.class).toInstance(rm);
|
bind(ResourceManager.class).toInstance(rm);
|
||||||
|
filter("/*").through(TestRMCustomAuthFilter.class);
|
||||||
serve("/*").with(GuiceContainer.class);
|
serve("/*").with(GuiceContainer.class);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Custom filter to be able to test auth methods and let the other ones go.
|
||||||
|
*/
|
||||||
|
@Singleton
|
||||||
|
public static class TestRMCustomAuthFilter extends AuthenticationFilter {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Properties getConfiguration(String configPrefix,
|
||||||
|
FilterConfig filterConfig) throws ServletException {
|
||||||
|
Properties props = new Properties();
|
||||||
|
Enumeration<?> names = filterConfig.getInitParameterNames();
|
||||||
|
while (names.hasMoreElements()) {
|
||||||
|
String name = (String) names.nextElement();
|
||||||
|
if (name.startsWith(configPrefix)) {
|
||||||
|
String value = filterConfig.getInitParameter(name);
|
||||||
|
props.put(name.substring(configPrefix.length()), value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
props.put(AuthenticationFilter.AUTH_TYPE, "simple");
|
||||||
|
props.put(PseudoAuthenticationHandler.ANONYMOUS_ALLOWED, "true");
|
||||||
|
return props;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static {
|
static {
|
||||||
GuiceServletConfig.setInjector(
|
GuiceServletConfig.setInjector(
|
||||||
Guice.createInjector(new WebServletModule()));
|
Guice.createInjector(new WebServletModule()));
|
||||||
@ -541,7 +590,7 @@ public class TestRMWebServicesNodes extends JerseyTestBase {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void verifyNonexistNodeException(String message, String type, String classname) {
|
private void verifyNonexistNodeException(String message, String type, String classname) {
|
||||||
assertTrue("exception message incorrect",
|
assertTrue("exception message incorrect: " + message,
|
||||||
"java.lang.Exception: nodeId, node_invalid:99, is not found"
|
"java.lang.Exception: nodeId, node_invalid:99, is not found"
|
||||||
.matches(message));
|
.matches(message));
|
||||||
assertTrue("exception type incorrect", "NotFoundException".matches(type));
|
assertTrue("exception type incorrect", "NotFoundException".matches(type));
|
||||||
@ -714,6 +763,64 @@ public class TestRMWebServicesNodes extends JerseyTestBase {
|
|||||||
verifyNodeInfo(info, rmnode1);
|
verifyNodeInfo(info, rmnode1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUpdateNodeResource() throws Exception {
|
||||||
|
WebResource r = resource().path(RMWSConsts.RM_WEB_SERVICE_PATH);
|
||||||
|
|
||||||
|
r = r.queryParam("user.name", userName);
|
||||||
|
RMNode rmnode = getRunningRMNode("h1", 1234, 5120);
|
||||||
|
String rmnodeId = rmnode.getNodeID().toString();
|
||||||
|
assertEquals("h1:1234", rmnodeId);
|
||||||
|
|
||||||
|
// assert memory and default vcores
|
||||||
|
ClientResponse response = r.path(RMWSConsts.NODES).path(rmnodeId)
|
||||||
|
.accept(MediaType.APPLICATION_XML)
|
||||||
|
.get(ClientResponse.class);
|
||||||
|
NodeInfo nodeInfo0 = response.getEntity(NodeInfo.class);
|
||||||
|
ResourceInfo nodeResource0 = nodeInfo0.getTotalResource();
|
||||||
|
assertEquals(5120, nodeResource0.getMemorySize());
|
||||||
|
assertEquals(4, nodeResource0.getvCores());
|
||||||
|
|
||||||
|
// the RM needs to be running to process the resource update
|
||||||
|
rm.start();
|
||||||
|
|
||||||
|
// update memory to 8192MB and 5 cores
|
||||||
|
Resource resource = Resource.newInstance(8192, 5);
|
||||||
|
ResourceOptionInfo resourceOption = new ResourceOptionInfo(
|
||||||
|
ResourceOption.newInstance(resource, 1000));
|
||||||
|
response = r.path(RMWSConsts.NODES).path(rmnodeId).path("resource")
|
||||||
|
.entity(resourceOption, MediaType.APPLICATION_XML_TYPE)
|
||||||
|
.accept(MediaType.APPLICATION_XML)
|
||||||
|
.post(ClientResponse.class);
|
||||||
|
assertResponseStatusCode(Status.OK, response.getStatusInfo());
|
||||||
|
ResourceInfo updatedResource = response.getEntity(ResourceInfo.class);
|
||||||
|
assertEquals(8192, updatedResource.getMemorySize());
|
||||||
|
assertEquals(5, updatedResource.getvCores());
|
||||||
|
|
||||||
|
// assert updated memory and cores
|
||||||
|
response = r.path(RMWSConsts.NODES).path(rmnodeId)
|
||||||
|
.accept(MediaType.APPLICATION_XML)
|
||||||
|
.get(ClientResponse.class);
|
||||||
|
NodeInfo nodeInfo1 = response.getEntity(NodeInfo.class);
|
||||||
|
ResourceInfo nodeResource1 = nodeInfo1.getTotalResource();
|
||||||
|
assertEquals(8192, nodeResource1.getMemorySize());
|
||||||
|
assertEquals(5, nodeResource1.getvCores());
|
||||||
|
|
||||||
|
// test non existing node
|
||||||
|
response = r.path(RMWSConsts.NODES).path("badnode").path("resource")
|
||||||
|
.entity(resourceOption, MediaType.APPLICATION_XML_TYPE)
|
||||||
|
.accept(MediaType.APPLICATION_JSON)
|
||||||
|
.post(ClientResponse.class);
|
||||||
|
assertResponseStatusCode(Status.BAD_REQUEST, response.getStatusInfo());
|
||||||
|
JSONObject json = response.getEntity(JSONObject.class);
|
||||||
|
JSONObject exception = json.getJSONObject("RemoteException");
|
||||||
|
assertEquals("IllegalArgumentException", exception.getString("exception"));
|
||||||
|
String msg = exception.getString("message");
|
||||||
|
assertTrue("Wrong message: " + msg, msg.startsWith("Invalid NodeId"));
|
||||||
|
|
||||||
|
rm.stop();
|
||||||
|
}
|
||||||
|
|
||||||
public void verifyNodesXML(NodeList nodes, RMNode nm)
|
public void verifyNodesXML(NodeList nodes, RMNode nm)
|
||||||
throws JSONException,
|
throws JSONException,
|
||||||
Exception {
|
Exception {
|
||||||
@ -750,7 +857,7 @@ public class TestRMWebServicesNodes extends JerseyTestBase {
|
|||||||
|
|
||||||
public void verifyNodeInfo(JSONObject nodeInfo, RMNode nm)
|
public void verifyNodeInfo(JSONObject nodeInfo, RMNode nm)
|
||||||
throws JSONException, Exception {
|
throws JSONException, Exception {
|
||||||
assertEquals("incorrect number of elements", 20, nodeInfo.length());
|
assertEquals("incorrect number of elements", 21, nodeInfo.length());
|
||||||
|
|
||||||
JSONObject resourceInfo = nodeInfo.getJSONObject("resourceUtilization");
|
JSONObject resourceInfo = nodeInfo.getJSONObject("resourceUtilization");
|
||||||
verifyNodeInfoGeneric(nm, nodeInfo.getString("state"),
|
verifyNodeInfoGeneric(nm, nodeInfo.getString("state"),
|
||||||
|
@ -48,6 +48,13 @@
|
|||||||
<artifactId>hadoop-yarn-common</artifactId>
|
<artifactId>hadoop-yarn-common</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.hadoop</groupId>
|
||||||
|
<artifactId>hadoop-yarn-common</artifactId>
|
||||||
|
<type>test-jar</type>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.apache.hadoop</groupId>
|
<groupId>org.apache.hadoop</groupId>
|
||||||
<artifactId>hadoop-yarn-server-common</artifactId>
|
<artifactId>hadoop-yarn-server-common</artifactId>
|
||||||
|
@ -58,6 +58,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.RMQueueAclInfo;
|
|||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDeleteRequestInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDeleteRequestInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationSubmissionRequestInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationSubmissionRequestInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateRequestInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateRequestInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceOptionInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
|
||||||
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
|
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
|
||||||
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
|
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
|
||||||
@ -155,6 +157,16 @@ public class DefaultRequestInterceptorREST
|
|||||||
null);
|
null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ResourceInfo updateNodeResource(HttpServletRequest hsr,
|
||||||
|
String nodeId, ResourceOptionInfo resourceOption) {
|
||||||
|
final String nodePath =
|
||||||
|
RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.NODES + "/" + nodeId;
|
||||||
|
return RouterWebServiceUtil.genericForward(webAppAddress, hsr,
|
||||||
|
ResourceInfo.class, HTTPMethods.POST,
|
||||||
|
nodePath + "/resource", resourceOption, null);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public AppsInfo getApps(HttpServletRequest hsr, String stateQuery,
|
public AppsInfo getApps(HttpServletRequest hsr, String stateQuery,
|
||||||
Set<String> statesQuery, String finalStatusQuery, String userQuery,
|
Set<String> statesQuery, String finalStatusQuery, String userQuery,
|
||||||
|
@ -21,9 +21,11 @@ package org.apache.hadoop.yarn.server.router.webapp;
|
|||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.security.Principal;
|
import java.security.Principal;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Map.Entry;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
@ -84,6 +86,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.RMQueueAclInfo;
|
|||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDeleteRequestInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDeleteRequestInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationSubmissionRequestInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationSubmissionRequestInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateRequestInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateRequestInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceOptionInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
|
||||||
import org.apache.hadoop.yarn.server.router.RouterMetrics;
|
import org.apache.hadoop.yarn.server.router.RouterMetrics;
|
||||||
import org.apache.hadoop.yarn.server.router.RouterServerUtil;
|
import org.apache.hadoop.yarn.server.router.RouterServerUtil;
|
||||||
@ -778,6 +782,20 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the active subclusters in the federation.
|
||||||
|
* @return Map from subcluster id to its info.
|
||||||
|
* @throws NotFoundException If the subclusters cannot be found.
|
||||||
|
*/
|
||||||
|
private Map<SubClusterId, SubClusterInfo> getActiveSubclusters()
|
||||||
|
throws NotFoundException {
|
||||||
|
try {
|
||||||
|
return federationFacade.getSubClusters(true);
|
||||||
|
} catch (YarnException e) {
|
||||||
|
throw new NotFoundException(e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The YARN Router will forward to the request to all the SubClusters to find
|
* The YARN Router will forward to the request to all the SubClusters to find
|
||||||
* where the node is running.
|
* where the node is running.
|
||||||
@ -796,67 +814,115 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
|
|||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public NodeInfo getNode(String nodeId) {
|
public NodeInfo getNode(String nodeId) {
|
||||||
Map<SubClusterId, SubClusterInfo> subClustersActive = null;
|
final Map<SubClusterId, SubClusterInfo> subClustersActive =
|
||||||
try {
|
getActiveSubclusters();
|
||||||
subClustersActive = federationFacade.getSubClusters(true);
|
|
||||||
} catch (YarnException e) {
|
|
||||||
throw new NotFoundException(e.getMessage());
|
|
||||||
}
|
|
||||||
|
|
||||||
if (subClustersActive.isEmpty()) {
|
if (subClustersActive.isEmpty()) {
|
||||||
throw new NotFoundException(
|
throw new NotFoundException(
|
||||||
FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE);
|
FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE);
|
||||||
}
|
}
|
||||||
|
final Map<SubClusterInfo, NodeInfo> results =
|
||||||
|
getNode(subClustersActive.values(), nodeId);
|
||||||
|
|
||||||
// Send the requests in parallel
|
// Collect the responses
|
||||||
CompletionService<NodeInfo> compSvc =
|
|
||||||
new ExecutorCompletionService<NodeInfo>(this.threadpool);
|
|
||||||
|
|
||||||
for (final SubClusterInfo info : subClustersActive.values()) {
|
|
||||||
compSvc.submit(new Callable<NodeInfo>() {
|
|
||||||
@Override
|
|
||||||
public NodeInfo call() {
|
|
||||||
DefaultRequestInterceptorREST interceptor =
|
|
||||||
getOrCreateInterceptorForSubCluster(
|
|
||||||
info.getSubClusterId(), info.getRMWebServiceAddress());
|
|
||||||
try {
|
|
||||||
NodeInfo nodeInfo = interceptor.getNode(nodeId);
|
|
||||||
return nodeInfo;
|
|
||||||
} catch (Exception e) {
|
|
||||||
LOG.error("Subcluster {} failed to return nodeInfo.",
|
|
||||||
info.getSubClusterId());
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
// Collect all the responses in parallel
|
|
||||||
NodeInfo nodeInfo = null;
|
NodeInfo nodeInfo = null;
|
||||||
for (int i = 0; i < subClustersActive.size(); i++) {
|
for (NodeInfo nodeResponse : results.values()) {
|
||||||
try {
|
try {
|
||||||
Future<NodeInfo> future = compSvc.take();
|
// Check if the node was already found in a different SubCluster and
|
||||||
NodeInfo nodeResponse = future.get();
|
// it has an old health report
|
||||||
|
if (nodeInfo == null || nodeInfo.getLastHealthUpdate() <
|
||||||
// Check if the node was found in this SubCluster
|
nodeResponse.getLastHealthUpdate()) {
|
||||||
if (nodeResponse != null) {
|
nodeInfo = nodeResponse;
|
||||||
// Check if the node was already found in a different SubCluster and
|
|
||||||
// it has an old health report
|
|
||||||
if (nodeInfo == null || nodeInfo.getLastHealthUpdate() <
|
|
||||||
nodeResponse.getLastHealthUpdate()) {
|
|
||||||
nodeInfo = nodeResponse;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
LOG.warn("Failed to get node report ", e);
|
LOG.warn("Failed to get node report ", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (nodeInfo == null) {
|
if (nodeInfo == null) {
|
||||||
throw new NotFoundException("nodeId, " + nodeId + ", is not found");
|
throw new NotFoundException("nodeId, " + nodeId + ", is not found");
|
||||||
}
|
}
|
||||||
return nodeInfo;
|
return nodeInfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get a node and the subcluster where it is.
|
||||||
|
* @param subClusters Subclusters where to search.
|
||||||
|
* @param nodeId Identifier of the node we are looking for.
|
||||||
|
* @return Map between subcluster and node.
|
||||||
|
*/
|
||||||
|
private Map<SubClusterInfo, NodeInfo> getNode(
|
||||||
|
Collection<SubClusterInfo> subClusters, String nodeId) {
|
||||||
|
|
||||||
|
// Send the requests in parallel
|
||||||
|
CompletionService<NodeInfo> compSvc =
|
||||||
|
new ExecutorCompletionService<NodeInfo>(this.threadpool);
|
||||||
|
final Map<SubClusterInfo, Future<NodeInfo>> futures = new HashMap<>();
|
||||||
|
for (final SubClusterInfo subcluster : subClusters) {
|
||||||
|
final SubClusterId subclusterId = subcluster.getSubClusterId();
|
||||||
|
Future<NodeInfo> result = compSvc.submit(() -> {
|
||||||
|
try {
|
||||||
|
DefaultRequestInterceptorREST interceptor =
|
||||||
|
getOrCreateInterceptorForSubCluster(
|
||||||
|
subclusterId, subcluster.getRMWebServiceAddress());
|
||||||
|
return interceptor.getNode(nodeId);
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error("Subcluster {} failed to return nodeInfo.",
|
||||||
|
subclusterId);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
futures.put(subcluster, result);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Collect the results
|
||||||
|
final Map<SubClusterInfo, NodeInfo> results = new HashMap<>();
|
||||||
|
for (Entry<SubClusterInfo, Future<NodeInfo>> entry : futures.entrySet()) {
|
||||||
|
try {
|
||||||
|
final Future<NodeInfo> future = entry.getValue();
|
||||||
|
final NodeInfo nodeInfo = future.get();
|
||||||
|
// Check if the node was found in this SubCluster
|
||||||
|
if (nodeInfo != null) {
|
||||||
|
SubClusterInfo subcluster = entry.getKey();
|
||||||
|
results.put(subcluster, nodeInfo);
|
||||||
|
}
|
||||||
|
} catch (Throwable e) {
|
||||||
|
LOG.warn("Failed to get node report ", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return results;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the subcluster a node belongs to.
|
||||||
|
* @param nodeId Identifier of the node we are looking for.
|
||||||
|
* @return The subcluster containing the node.
|
||||||
|
* @throws NotFoundException If the node cannot be found.
|
||||||
|
*/
|
||||||
|
private SubClusterInfo getNodeSubcluster(String nodeId)
|
||||||
|
throws NotFoundException {
|
||||||
|
|
||||||
|
final Collection<SubClusterInfo> subClusters =
|
||||||
|
getActiveSubclusters().values();
|
||||||
|
final Map<SubClusterInfo, NodeInfo> results =
|
||||||
|
getNode(subClusters, nodeId);
|
||||||
|
SubClusterInfo subcluster = null;
|
||||||
|
NodeInfo nodeInfo = null;
|
||||||
|
for (Entry<SubClusterInfo, NodeInfo> entry : results.entrySet()) {
|
||||||
|
NodeInfo nodeResponse = entry.getValue();
|
||||||
|
if (nodeInfo == null || nodeInfo.getLastHealthUpdate() <
|
||||||
|
nodeResponse.getLastHealthUpdate()) {
|
||||||
|
subcluster = entry.getKey();
|
||||||
|
nodeInfo = nodeResponse;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (subcluster == null) {
|
||||||
|
throw new NotFoundException(
|
||||||
|
"Cannot find " + nodeId + " in any subcluster");
|
||||||
|
}
|
||||||
|
return subcluster;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The YARN Router will forward the request to all the YARN RMs in parallel,
|
* The YARN Router will forward the request to all the YARN RMs in parallel,
|
||||||
* after that it will remove all the duplicated NodeInfo by using the NodeId.
|
* after that it will remove all the duplicated NodeInfo by using the NodeId.
|
||||||
@ -881,10 +947,10 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
|
|||||||
|
|
||||||
NodesInfo nodes = new NodesInfo();
|
NodesInfo nodes = new NodesInfo();
|
||||||
|
|
||||||
Map<SubClusterId, SubClusterInfo> subClustersActive = null;
|
final Map<SubClusterId, SubClusterInfo> subClustersActive;
|
||||||
try {
|
try {
|
||||||
subClustersActive = federationFacade.getSubClusters(true);
|
subClustersActive = getActiveSubclusters();
|
||||||
} catch (YarnException e) {
|
} catch (Exception e) {
|
||||||
LOG.error("Cannot get nodes: {}", e.getMessage());
|
LOG.error("Cannot get nodes: {}", e.getMessage());
|
||||||
return new NodesInfo();
|
return new NodesInfo();
|
||||||
}
|
}
|
||||||
@ -935,14 +1001,25 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
|
|||||||
return RouterWebServiceUtil.deleteDuplicateNodesInfo(nodes.getNodes());
|
return RouterWebServiceUtil.deleteDuplicateNodesInfo(nodes.getNodes());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ResourceInfo updateNodeResource(HttpServletRequest hsr,
|
||||||
|
String nodeId, ResourceOptionInfo resourceOption) {
|
||||||
|
SubClusterInfo subcluster = getNodeSubcluster(nodeId);
|
||||||
|
DefaultRequestInterceptorREST interceptor =
|
||||||
|
getOrCreateInterceptorForSubCluster(
|
||||||
|
subcluster.getSubClusterId(),
|
||||||
|
subcluster.getRMWebServiceAddress());
|
||||||
|
return interceptor.updateNodeResource(hsr, nodeId, resourceOption);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ClusterMetricsInfo getClusterMetricsInfo() {
|
public ClusterMetricsInfo getClusterMetricsInfo() {
|
||||||
ClusterMetricsInfo metrics = new ClusterMetricsInfo();
|
ClusterMetricsInfo metrics = new ClusterMetricsInfo();
|
||||||
|
|
||||||
Map<SubClusterId, SubClusterInfo> subClustersActive = null;
|
final Map<SubClusterId, SubClusterInfo> subClustersActive;
|
||||||
try {
|
try {
|
||||||
subClustersActive = federationFacade.getSubClusters(true);
|
subClustersActive = getActiveSubclusters();
|
||||||
} catch (YarnException e) {
|
} catch (Exception e) {
|
||||||
LOG.error(e.getLocalizedMessage());
|
LOG.error(e.getLocalizedMessage());
|
||||||
return metrics;
|
return metrics;
|
||||||
}
|
}
|
||||||
|
@ -28,6 +28,7 @@ import java.util.Set;
|
|||||||
|
|
||||||
import javax.servlet.http.HttpServletRequest;
|
import javax.servlet.http.HttpServletRequest;
|
||||||
import javax.servlet.http.HttpServletResponse;
|
import javax.servlet.http.HttpServletResponse;
|
||||||
|
import javax.ws.rs.Consumes;
|
||||||
import javax.ws.rs.DELETE;
|
import javax.ws.rs.DELETE;
|
||||||
import javax.ws.rs.DefaultValue;
|
import javax.ws.rs.DefaultValue;
|
||||||
import javax.ws.rs.FormParam;
|
import javax.ws.rs.FormParam;
|
||||||
@ -80,6 +81,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.RMQueueAclInfo;
|
|||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDeleteRequestInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDeleteRequestInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationSubmissionRequestInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationSubmissionRequestInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateRequestInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateRequestInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceOptionInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
|
||||||
import org.apache.hadoop.yarn.server.router.Router;
|
import org.apache.hadoop.yarn.server.router.Router;
|
||||||
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
|
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
|
||||||
@ -394,6 +397,22 @@ public class RouterWebServices implements RMWebServiceProtocol {
|
|||||||
return pipeline.getRootInterceptor().getNode(nodeId);
|
return pipeline.getRootInterceptor().getNode(nodeId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@POST
|
||||||
|
@Path(RMWSConsts.NODE_RESOURCE)
|
||||||
|
@Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
|
||||||
|
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
|
||||||
|
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
|
||||||
|
@Override
|
||||||
|
public ResourceInfo updateNodeResource(
|
||||||
|
@Context HttpServletRequest hsr,
|
||||||
|
@PathParam(RMWSConsts.NODEID) String nodeId,
|
||||||
|
ResourceOptionInfo resourceOption) throws AuthorizationException {
|
||||||
|
init();
|
||||||
|
RequestInterceptorChainWrapper pipeline = getInterceptorChain(null);
|
||||||
|
return pipeline.getRootInterceptor().updateNodeResource(
|
||||||
|
hsr, nodeId, resourceOption);
|
||||||
|
}
|
||||||
|
|
||||||
@GET
|
@GET
|
||||||
@Path(RMWSConsts.APPS)
|
@Path(RMWSConsts.APPS)
|
||||||
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
|
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
|
||||||
|
@ -31,6 +31,7 @@ import javax.ws.rs.core.Response.Status;
|
|||||||
|
|
||||||
import org.apache.hadoop.security.authorize.AuthorizationException;
|
import org.apache.hadoop.security.authorize.AuthorizationException;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
|
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
||||||
@ -43,6 +44,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewApplication;
|
|||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceOptionInfo;
|
||||||
import org.apache.hadoop.yarn.webapp.NotFoundException;
|
import org.apache.hadoop.yarn.webapp.NotFoundException;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
@ -177,6 +179,16 @@ public class MockDefaultRequestInterceptorREST
|
|||||||
return nodes;
|
return nodes;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ResourceInfo updateNodeResource(HttpServletRequest hsr,
|
||||||
|
String nodeId, ResourceOptionInfo resourceOption) {
|
||||||
|
if (!isRunning) {
|
||||||
|
throw new RuntimeException("RM is stopped");
|
||||||
|
}
|
||||||
|
Resource resource = resourceOption.getResourceOption().getResource();
|
||||||
|
return new ResourceInfo(resource);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ClusterMetricsInfo getClusterMetricsInfo() {
|
public ClusterMetricsInfo getClusterMetricsInfo() {
|
||||||
if (!isRunning) {
|
if (!isRunning) {
|
||||||
|
@ -55,6 +55,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.RMQueueAclInfo;
|
|||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDeleteRequestInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDeleteRequestInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationSubmissionRequestInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationSubmissionRequestInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateRequestInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateRequestInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceOptionInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
|
||||||
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
|
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
|
||||||
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
|
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
|
||||||
@ -114,6 +116,12 @@ public class MockRESTRequestInterceptor extends AbstractRESTRequestInterceptor {
|
|||||||
return new NodeInfo();
|
return new NodeInfo();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ResourceInfo updateNodeResource(HttpServletRequest hsr, String nodeId,
|
||||||
|
ResourceOptionInfo resourceOption) throws AuthorizationException {
|
||||||
|
return new ResourceInfo();
|
||||||
|
}
|
||||||
|
|
||||||
@SuppressWarnings("checkstyle:parameternumber")
|
@SuppressWarnings("checkstyle:parameternumber")
|
||||||
@Override
|
@Override
|
||||||
public AppsInfo getApps(HttpServletRequest hsr, String stateQuery,
|
public AppsInfo getApps(HttpServletRequest hsr, String stateQuery,
|
||||||
@ -349,5 +357,4 @@ public class MockRESTRequestInterceptor extends AbstractRESTRequestInterceptor {
|
|||||||
String containerId) {
|
String containerId) {
|
||||||
return new ContainerInfo();
|
return new ContainerInfo();
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
@ -53,6 +53,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.RMQueueAclInfo;
|
|||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDeleteRequestInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDeleteRequestInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationSubmissionRequestInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationSubmissionRequestInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateRequestInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateRequestInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceOptionInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
|
||||||
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
|
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
|
||||||
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
|
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
|
||||||
@ -139,6 +141,13 @@ public class PassThroughRESTRequestInterceptor
|
|||||||
return getNextInterceptor().getNode(nodeId);
|
return getNextInterceptor().getNode(nodeId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ResourceInfo updateNodeResource(HttpServletRequest hsr, String nodeId,
|
||||||
|
ResourceOptionInfo resourceOption) throws AuthorizationException {
|
||||||
|
return getNextInterceptor().updateNodeResource(
|
||||||
|
hsr, nodeId, resourceOption);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public AppsInfo getApps(HttpServletRequest hsr, String stateQuery,
|
public AppsInfo getApps(HttpServletRequest hsr, String stateQuery,
|
||||||
Set<String> statesQuery, String finalStatusQuery, String userQuery,
|
Set<String> statesQuery, String finalStatusQuery, String userQuery,
|
||||||
|
@ -25,6 +25,8 @@ import java.util.List;
|
|||||||
import javax.ws.rs.core.Response;
|
import javax.ws.rs.core.Response;
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ResourceOption;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager;
|
import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager;
|
||||||
@ -40,6 +42,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsIn
|
|||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewApplication;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewApplication;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceOptionInfo;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
@ -423,6 +427,24 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
|
|||||||
// The remove duplicate operations is tested in TestRouterWebServiceUtil
|
// The remove duplicate operations is tested in TestRouterWebServiceUtil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This test validates the correctness of updateNodeResource().
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testUpdateNodeResource() {
|
||||||
|
List<NodeInfo> nodes = interceptor.getNodes(null).getNodes();
|
||||||
|
Assert.assertFalse(nodes.isEmpty());
|
||||||
|
final String nodeId = nodes.get(0).getNodeId();
|
||||||
|
ResourceOptionInfo resourceOption = new ResourceOptionInfo(
|
||||||
|
ResourceOption.newInstance(
|
||||||
|
Resource.newInstance(2048, 3), 1000));
|
||||||
|
ResourceInfo resource = interceptor.updateNodeResource(
|
||||||
|
null, nodeId, resourceOption);
|
||||||
|
Assert.assertNotNull(resource);
|
||||||
|
Assert.assertEquals(2048, resource.getMemorySize());
|
||||||
|
Assert.assertEquals(3, resource.getvCores());
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This test validates the correctness of getClusterMetricsInfo in case each
|
* This test validates the correctness of getClusterMetricsInfo in case each
|
||||||
* SubCluster provided a ClusterMetricsInfo with appsSubmitted set to the
|
* SubCluster provided a ClusterMetricsInfo with appsSubmitted set to the
|
||||||
|
@ -44,6 +44,7 @@ import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.IN
|
|||||||
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.LABELS;
|
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.LABELS;
|
||||||
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.LABEL_MAPPINGS;
|
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.LABEL_MAPPINGS;
|
||||||
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.METRICS;
|
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.METRICS;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.NODE_RESOURCE;
|
||||||
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.NODES;
|
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.NODES;
|
||||||
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.NODES_NODEID;
|
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.NODES_NODEID;
|
||||||
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.NODES_NODEID_GETLABELS;
|
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.NODES_NODEID_GETLABELS;
|
||||||
@ -63,6 +64,7 @@ import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.ST
|
|||||||
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.TIME;
|
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.TIME;
|
||||||
import static org.apache.hadoop.yarn.server.router.webapp.HTTPMethods.POST;
|
import static org.apache.hadoop.yarn.server.router.webapp.HTTPMethods.POST;
|
||||||
import static org.apache.hadoop.yarn.server.router.webapp.HTTPMethods.PUT;
|
import static org.apache.hadoop.yarn.server.router.webapp.HTTPMethods.PUT;
|
||||||
|
import static org.apache.hadoop.yarn.webapp.WebServicesTestUtils.assertResponseStatusCode;
|
||||||
import static org.apache.hadoop.yarn.webapp.util.WebAppUtils.getNMWebAppURLWithoutScheme;
|
import static org.apache.hadoop.yarn.webapp.util.WebAppUtils.getNMWebAppURLWithoutScheme;
|
||||||
import static org.apache.hadoop.yarn.webapp.util.WebAppUtils.getRMWebAppURLWithScheme;
|
import static org.apache.hadoop.yarn.webapp.util.WebAppUtils.getRMWebAppURLWithScheme;
|
||||||
import static org.apache.hadoop.yarn.webapp.util.WebAppUtils.getRouterWebAppURLWithScheme;
|
import static org.apache.hadoop.yarn.webapp.util.WebAppUtils.getRouterWebAppURLWithScheme;
|
||||||
@ -87,6 +89,8 @@ import org.apache.hadoop.security.UserGroupInformation;
|
|||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.hadoop.util.concurrent.HadoopExecutors;
|
import org.apache.hadoop.util.concurrent.HadoopExecutors;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeLabel;
|
import org.apache.hadoop.yarn.api.records.NodeLabel;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ResourceOption;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
|
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||||
@ -116,11 +120,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo;
|
|||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDeleteRequestInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDeleteRequestInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationSubmissionRequestInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationSubmissionRequestInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateRequestInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateRequestInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceOptionInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
|
||||||
import org.apache.hadoop.yarn.server.router.Router;
|
import org.apache.hadoop.yarn.server.router.Router;
|
||||||
import org.apache.hadoop.yarn.server.webapp.WebServices;
|
import org.apache.hadoop.yarn.server.webapp.WebServices;
|
||||||
import org.apache.hadoop.yarn.server.webapp.dao.AppsInfo;
|
import org.apache.hadoop.yarn.server.webapp.dao.AppsInfo;
|
||||||
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
|
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
|
||||||
|
import org.codehaus.jettison.json.JSONObject;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
@ -131,6 +137,7 @@ import com.sun.jersey.api.client.Client;
|
|||||||
import com.sun.jersey.api.client.ClientHandlerException;
|
import com.sun.jersey.api.client.ClientHandlerException;
|
||||||
import com.sun.jersey.api.client.ClientResponse;
|
import com.sun.jersey.api.client.ClientResponse;
|
||||||
import com.sun.jersey.api.client.WebResource;
|
import com.sun.jersey.api.client.WebResource;
|
||||||
|
import com.sun.jersey.api.client.ClientResponse.Status;
|
||||||
import com.sun.jersey.api.client.WebResource.Builder;
|
import com.sun.jersey.api.client.WebResource.Builder;
|
||||||
|
|
||||||
import net.jcip.annotations.NotThreadSafe;
|
import net.jcip.annotations.NotThreadSafe;
|
||||||
@ -467,6 +474,47 @@ public class TestRouterWebServicesREST {
|
|||||||
routerResponse.getVersion());
|
routerResponse.getVersion());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This test validates the correctness of
|
||||||
|
* {@link RMWebServiceProtocol#updateNodeResources()} inside Router.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testUpdateNodeResource() throws Exception {
|
||||||
|
|
||||||
|
// wait until a node shows up and check the resources
|
||||||
|
GenericTestUtils.waitFor(() -> getNodeId() != null, 100, 5 * 1000);
|
||||||
|
String nodeId = getNodeId();
|
||||||
|
|
||||||
|
// assert memory and default vcores
|
||||||
|
List<NodeInfo> responses0 = performGetCalls(
|
||||||
|
RM_WEB_SERVICE_PATH + format(NODES_NODEID, getNodeId()),
|
||||||
|
NodeInfo.class, null, null);
|
||||||
|
NodeInfo nodeInfo0 = responses0.get(0);
|
||||||
|
assertEquals(8192, nodeInfo0.getTotalResource().getMemorySize());
|
||||||
|
assertEquals(8, nodeInfo0.getTotalResource().getvCores());
|
||||||
|
|
||||||
|
// update memory to 4096MB and 5 cores
|
||||||
|
Resource resource = Resource.newInstance(4096, 5);
|
||||||
|
ResourceOptionInfo resourceOption = new ResourceOptionInfo(
|
||||||
|
ResourceOption.newInstance(resource, 1000));
|
||||||
|
ClientResponse routerResponse = performCall(
|
||||||
|
RM_WEB_SERVICE_PATH + format(NODE_RESOURCE, nodeId),
|
||||||
|
null, null, resourceOption, POST);
|
||||||
|
assertResponseStatusCode(Status.OK, routerResponse.getStatusInfo());
|
||||||
|
JSONObject json = routerResponse.getEntity(JSONObject.class);
|
||||||
|
JSONObject totalResource = json.getJSONObject("resourceInfo");
|
||||||
|
assertEquals(resource.getMemorySize(), totalResource.getLong("memory"));
|
||||||
|
assertEquals(resource.getVirtualCores(), totalResource.getLong("vCores"));
|
||||||
|
|
||||||
|
// assert updated memory and cores
|
||||||
|
List<NodeInfo> responses1 = performGetCalls(
|
||||||
|
RM_WEB_SERVICE_PATH + format(NODES_NODEID, getNodeId()),
|
||||||
|
NodeInfo.class, null, null);
|
||||||
|
NodeInfo nodeInfo1 = responses1.get(0);
|
||||||
|
assertEquals(4096, nodeInfo1.getTotalResource().getMemorySize());
|
||||||
|
assertEquals(5, nodeInfo1.getTotalResource().getvCores());
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This test validates the correctness of
|
* This test validates the correctness of
|
||||||
* {@link RMWebServiceProtocol#getActivities()} inside Router.
|
* {@link RMWebServiceProtocol#getActivities()} inside Router.
|
||||||
@ -1338,7 +1386,11 @@ public class TestRouterWebServicesREST {
|
|||||||
ClientResponse response =
|
ClientResponse response =
|
||||||
toRM.accept(APPLICATION_XML).get(ClientResponse.class);
|
toRM.accept(APPLICATION_XML).get(ClientResponse.class);
|
||||||
NodesInfo ci = response.getEntity(NodesInfo.class);
|
NodesInfo ci = response.getEntity(NodesInfo.class);
|
||||||
return ci.getNodes().get(0).getNodeId();
|
List<NodeInfo> nodes = ci.getNodes();
|
||||||
|
if (nodes.isEmpty()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return nodes.get(0).getNodeId();
|
||||||
}
|
}
|
||||||
|
|
||||||
private NewApplication getNewApplicationId() {
|
private NewApplication getNewApplicationId() {
|
||||||
|
@ -2817,6 +2817,7 @@ Use the following URI to obtain a Node Object, from a node identified by the nod
|
|||||||
| usedVirtualCores | long | The total number of vCores currently used on the node |
|
| usedVirtualCores | long | The total number of vCores currently used on the node |
|
||||||
| availableVirtualCores | long | The total number of vCores available on the node |
|
| availableVirtualCores | long | The total number of vCores available on the node |
|
||||||
| resourceUtilization | object | Resource utilization on the node |
|
| resourceUtilization | object | Resource utilization on the node |
|
||||||
|
| totalResource | object | Resources on the node |
|
||||||
|
|
||||||
The *resourceUtilization* object contains the following elements:
|
The *resourceUtilization* object contains the following elements:
|
||||||
|
|
||||||
@ -2871,6 +2872,11 @@ Response Body:
|
|||||||
"aggregatedContainersPhysicalMemoryMB": 0,
|
"aggregatedContainersPhysicalMemoryMB": 0,
|
||||||
"aggregatedContainersVirtualMemoryMB": 0,
|
"aggregatedContainersVirtualMemoryMB": 0,
|
||||||
"containersCPUUsage": 0
|
"containersCPUUsage": 0
|
||||||
|
},
|
||||||
|
"totalResource":
|
||||||
|
{
|
||||||
|
"memory": 2048,
|
||||||
|
"vCores": 5
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -2916,9 +2922,117 @@ Response Body:
|
|||||||
<aggregatedContainersVirtualMemoryMB>0</aggregatedContainersVirtualMemoryMB>
|
<aggregatedContainersVirtualMemoryMB>0</aggregatedContainersVirtualMemoryMB>
|
||||||
<containersCPUUsage>0.0</containersCPUUsage>
|
<containersCPUUsage>0.0</containersCPUUsage>
|
||||||
</resourceUtilization>
|
</resourceUtilization>
|
||||||
|
<totalResource>
|
||||||
|
<memory>2048</memory>
|
||||||
|
<vCores>5</vCores>
|
||||||
|
</totalResource>
|
||||||
</node>
|
</node>
|
||||||
```
|
```
|
||||||
|
|
||||||
|
Cluster Node Update Resource API
|
||||||
|
--------------------------------
|
||||||
|
|
||||||
|
Update the total resources in a node.
|
||||||
|
|
||||||
|
### URI
|
||||||
|
|
||||||
|
Use the following URI to update the resources of a Node Object identified by the nodeid value.
|
||||||
|
|
||||||
|
http://rm-http-address:port/ws/v1/cluster/nodes/{nodeid}/resource
|
||||||
|
|
||||||
|
### HTTP Operations Supported
|
||||||
|
|
||||||
|
POST
|
||||||
|
|
||||||
|
### Query Parameters Supported
|
||||||
|
|
||||||
|
None
|
||||||
|
|
||||||
|
### Elements of the *resourceOption* object
|
||||||
|
|
||||||
|
| Item | Data Type | Description |
|
||||||
|
|:---- |:---- |:---- |
|
||||||
|
| memory | long | The total amount of memory to set on the node (in MB) |
|
||||||
|
| vcores | long | The total number of vCores to set on the node |
|
||||||
|
| overCommitTimeout | long | The timeout to preempt containers |
|
||||||
|
|
||||||
|
### Response Examples
|
||||||
|
|
||||||
|
**JSON response**
|
||||||
|
|
||||||
|
HTTP Request:
|
||||||
|
|
||||||
|
POST http://rm-http-address:port/ws/v1/cluster/nodes/h2:1235/resource
|
||||||
|
|
||||||
|
Request body:
|
||||||
|
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"resource":
|
||||||
|
{
|
||||||
|
"memory": 1024,
|
||||||
|
"vCores": 3
|
||||||
|
},
|
||||||
|
"overCommitTimeout": -1
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
Response Header:
|
||||||
|
|
||||||
|
HTTP/1.1 200 OK
|
||||||
|
Content-Type: application/json
|
||||||
|
Transfer-Encoding: chunked
|
||||||
|
Server: Jetty(6.1.26)
|
||||||
|
|
||||||
|
Response Body:
|
||||||
|
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"resourceInfo":
|
||||||
|
{
|
||||||
|
"memory": 8192,
|
||||||
|
"vCores": 5
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
**XML response**
|
||||||
|
|
||||||
|
HTTP Request:
|
||||||
|
|
||||||
|
GET http://rm-http-address:port/ws/v1/cluster/node/h2:1235/resource
|
||||||
|
Accept: application/xml
|
||||||
|
|
||||||
|
Request body:
|
||||||
|
|
||||||
|
```xml
|
||||||
|
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
|
||||||
|
<resourceOption>
|
||||||
|
<resource>
|
||||||
|
<memory>8192</memory>
|
||||||
|
<vCores>5</vCores>
|
||||||
|
</resource>
|
||||||
|
<overCommitTimeout>1000</overCommitTimeout>
|
||||||
|
</resourceOption>
|
||||||
|
```
|
||||||
|
|
||||||
|
Response Header:
|
||||||
|
|
||||||
|
HTTP/1.1 200 OK
|
||||||
|
Content-Type: application/xml
|
||||||
|
Content-Length: 552
|
||||||
|
Server: Jetty(6.1.26)
|
||||||
|
|
||||||
|
Response Body:
|
||||||
|
|
||||||
|
```xml
|
||||||
|
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
|
||||||
|
<resourceInfo>
|
||||||
|
<memory>8192</memory>
|
||||||
|
<vCores>5</vCores>
|
||||||
|
</resourceInfo>
|
||||||
|
```
|
||||||
|
|
||||||
Cluster Writeable APIs
|
Cluster Writeable APIs
|
||||||
----------------------
|
----------------------
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user