YARN-313. Add Admin API for supporting node resource configuration in command line. (Contributed by Inigo Goiri, Kenji Kikushima and Junping Du)

This commit is contained in:
Junping Du 2015-09-15 07:55:59 -07:00
parent a440567491
commit 73e3a49eb0
21 changed files with 642 additions and 23 deletions

View File

@ -190,6 +190,10 @@ Release 2.8.0 - UNRELEASED
YARN-2884. Added a proxy service in NM to proxy the the communication
between AM and RM. (Kishore Chaliparambil via jianhe)
YARN-313. Add Admin API for supporting node resource configuration in
command line. (Inigo Goiri, Kenji Kikushima and Junping Du
via junping_du)
IMPROVEMENTS
YARN-644. Basic null check is not performed on passed in arguments before

View File

@ -36,6 +36,9 @@ public static ResourceOption newInstance(Resource resource,
return resourceOption;
}
/** Negative value means no timeout. */
public static final int OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT = -1;
/**
* Get the <em>resource</em> of the ResourceOption.
* @return <em>resource</em> of the ResourceOption

View File

@ -38,6 +38,9 @@
@Evolving
public class YarnConfiguration extends Configuration {
@Private
public static final String DR_CONFIGURATION_FILE= "dynamic-resources.xml";
@Private
public static final String CS_CONFIGURATION_FILE= "capacity-scheduler.xml";
@ -57,6 +60,7 @@ public class YarnConfiguration extends Configuration {
@Private
public static final List<String> RM_CONFIGURATION_FILES =
Collections.unmodifiableList(Arrays.asList(
DR_CONFIGURATION_FILE,
CS_CONFIGURATION_FILE,
HADOOP_POLICY_CONFIGURATION_FILE,
YARN_SITE_CONFIGURATION_FILE,

View File

@ -21,6 +21,7 @@
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.io.retry.Idempotent;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.tools.GetUserMappingsProtocol;
@ -51,6 +52,8 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesResponse;
@Private
public interface ResourceManagerAdministrationProtocol extends GetUserMappingsProtocol {
@ -74,7 +77,7 @@ public RefreshNodesResponse refreshNodes(RefreshNodesRequest request)
@Private
@Idempotent
public RefreshUserToGroupsMappingsResponse refreshUserToGroupsMappings(
RefreshUserToGroupsMappingsResponse refreshUserToGroupsMappings(
RefreshUserToGroupsMappingsRequest request)
throws StandbyException, YarnException, IOException;
@ -107,9 +110,14 @@ public RefreshServiceAclsResponse refreshServiceAcls(
@Private
@Idempotent
public UpdateNodeResourceResponse updateNodeResource(
UpdateNodeResourceRequest request)
throws YarnException, IOException;
UpdateNodeResourceRequest request) throws YarnException, IOException;
@Private
@Evolving
@Idempotent
public RefreshNodesResourcesResponse refreshNodesResources(
RefreshNodesResourcesRequest request) throws YarnException, IOException;
@Private
@Idempotent
public AddToClusterNodeLabelsResponse addToClusterNodeLabels(

View File

@ -0,0 +1,39 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.api.protocolrecords;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.yarn.util.Records;
/**
* Request to refresh the resources of a node.
*/
@Private
@Evolving
public abstract class RefreshNodesResourcesRequest {
@Public
@Evolving
public static RefreshNodesResourcesRequest newInstance() {
RefreshNodesResourcesRequest request =
Records.newRecord(RefreshNodesResourcesRequest.class);
return request;
}
}

View File

@ -0,0 +1,39 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.api.protocolrecords;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.util.Records;
/**
* Response to a request to refresh the resources of a node.
*/
@Private
@Evolving
public abstract class RefreshNodesResourcesResponse {
@Private
@Unstable
public static RefreshNodesResourcesResponse newInstance() {
RefreshNodesResourcesResponse response =
Records.newRecord(RefreshNodesResourcesResponse.class);
return response;
}
}

View File

@ -38,7 +38,8 @@ service ResourceManagerAdministrationProtocolService {
rpc refreshAdminAcls(RefreshAdminAclsRequestProto) returns (RefreshAdminAclsResponseProto);
rpc refreshServiceAcls(RefreshServiceAclsRequestProto) returns (RefreshServiceAclsResponseProto);
rpc getGroupsForUser(GetGroupsForUserRequestProto) returns (GetGroupsForUserResponseProto);
rpc updateNodeResource (UpdateNodeResourceRequestProto) returns (UpdateNodeResourceResponseProto);
rpc updateNodeResource(UpdateNodeResourceRequestProto) returns (UpdateNodeResourceResponseProto);
rpc refreshNodesResources(RefreshNodesResourcesRequestProto) returns (RefreshNodesResourcesResponseProto);
rpc addToClusterNodeLabels(AddToClusterNodeLabelsRequestProto) returns (AddToClusterNodeLabelsResponseProto);
rpc removeFromClusterNodeLabels(RemoveFromClusterNodeLabelsRequestProto) returns (RemoveFromClusterNodeLabelsResponseProto);
rpc replaceLabelsOnNodes(ReplaceLabelsOnNodeRequestProto) returns (ReplaceLabelsOnNodeResponseProto);

View File

@ -76,6 +76,12 @@ message UpdateNodeResourceRequestProto {
message UpdateNodeResourceResponseProto {
}
message RefreshNodesResourcesRequestProto {
}
message RefreshNodesResourcesResponseProto {
}
message AddToClusterNodeLabelsRequestProto {
repeated NodeLabelProto nodeLabels = 1;
}

View File

@ -41,6 +41,8 @@
import org.apache.hadoop.yarn.api.records.DecommissionType;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.client.RMHAServiceTarget;
import org.apache.hadoop.yarn.conf.HAUtil;
@ -58,12 +60,15 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
@ -98,6 +103,8 @@ public class RMAdminCLI extends HAAdmin {
+ "[-g [timeout in seconds] is optional, if we specify the "
+ "timeout then ResourceManager will wait for timeout before "
+ "marking the NodeManager as decommissioned."))
.put("-refreshNodesResources", new UsageInfo("",
"Refresh resources of NodeManagers at the ResourceManager."))
.put("-refreshSuperUserGroupsConfiguration", new UsageInfo("",
"Refresh superuser proxy groups mappings"))
.put("-refreshUserToGroupsMappings", new UsageInfo("",
@ -136,7 +143,10 @@ public class RMAdminCLI extends HAAdmin {
.put("-refreshClusterMaxPriority",
new UsageInfo("",
"Refresh cluster max priority"))
.build();
.put("-updateNodeResource",
new UsageInfo("[NodeID] [MemSize] [vCores] ([OvercommitTimeout])",
"Update resource on specific node."))
.build();
public RMAdminCLI() {
super();
@ -221,6 +231,7 @@ private static void printHelp(String cmd, boolean isHAEnabled) {
"yarn rmadmin" +
" [-refreshQueues]" +
" [-refreshNodes [-g [timeout in seconds]]]" +
" [-refreshNodesResources]" +
" [-refreshSuperUserGroupsConfiguration]" +
" [-refreshUserToGroupsMappings]" +
" [-refreshAdminAcls]" +
@ -230,7 +241,8 @@ private static void printHelp(String cmd, boolean isHAEnabled) {
+ "label2(exclusive=false),label3\">]" +
" [-removeFromClusterNodeLabels <label1,label2,label3>]" +
" [-replaceLabelsOnNode <\"node1[:port]=label1,label2 node2[:port]=label1\">]" +
" [-directlyAccessNodeLabelStore]]");
" [-directlyAccessNodeLabelStore]]" +
" [-updateNodeResource [NodeID] [MemSize] [vCores] ([OvercommitTimeout])");
if (isHAEnabled) {
appendHAUsage(summary);
}
@ -348,6 +360,15 @@ private int refreshNodes(long timeout) throws IOException, YarnException {
return 0;
}
private int refreshNodesResources() throws IOException, YarnException {
// Refresh the resources at the Nodemanager
ResourceManagerAdministrationProtocol adminProtocol = createAdminProtocol();
RefreshNodesResourcesRequest request =
recordFactory.newRecordInstance(RefreshNodesResourcesRequest.class);
adminProtocol.refreshNodesResources(request);
return 0;
}
private int refreshUserToGroupsMappings() throws IOException,
YarnException {
// Refresh the user-to-groups mappings
@ -395,6 +416,22 @@ private int refreshClusterMaxPriority() throws IOException, YarnException {
return 0;
}
private int updateNodeResource(String nodeIdStr, int memSize,
int cores, int overCommitTimeout) throws IOException, YarnException {
// Refresh the nodes
ResourceManagerAdministrationProtocol adminProtocol = createAdminProtocol();
UpdateNodeResourceRequest request =
recordFactory.newRecordInstance(UpdateNodeResourceRequest.class);
NodeId nodeId = ConverterUtils.toNodeId(nodeIdStr);
Resource resource = Resources.createResource(memSize, cores);
Map<NodeId, ResourceOption> resourceMap =
new HashMap<NodeId, ResourceOption>();
resourceMap.put(
nodeId, ResourceOption.newInstance(resource, overCommitTimeout));
adminProtocol.updateNodeResource(request);
return 0;
}
private int getGroups(String[] usernames) throws IOException {
// Get groups users belongs to
ResourceManagerAdministrationProtocol adminProtocol = createAdminProtocol();
@ -653,6 +690,7 @@ public int run(String[] args) throws Exception {
// verify that we have enough command line parameters
//
if ("-refreshAdminAcls".equals(cmd) || "-refreshQueues".equals(cmd) ||
"-refreshNodesResources".equals(cmd) ||
"-refreshServiceAcl".equals(cmd) ||
"-refreshUserToGroupsMappings".equals(cmd) ||
"-refreshSuperUserGroupsConfiguration".equals(cmd)) {
@ -681,6 +719,8 @@ public int run(String[] args) throws Exception {
printUsage(cmd, isHAEnabled);
return -1;
}
} else if ("-refreshNodesResources".equals(cmd)) {
exitCode = refreshNodesResources();
} else if ("-refreshUserToGroupsMappings".equals(cmd)) {
exitCode = refreshUserToGroupsMappings();
} else if ("-refreshSuperUserGroupsConfiguration".equals(cmd)) {
@ -694,6 +734,23 @@ public int run(String[] args) throws Exception {
} else if ("-getGroups".equals(cmd)) {
String[] usernames = Arrays.copyOfRange(args, i, args.length);
exitCode = getGroups(usernames);
} else if ("-updateNodeResource".equals(cmd)) {
if (args.length < 4 || args.length > 5) {
System.err.println("Number of parameters specified for " +
"updateNodeResource is wrong.");
printUsage(cmd, isHAEnabled);
exitCode = -1;
} else {
String nodeID = args[i++];
String memSize = args[i++];
String cores = args[i++];
int overCommitTimeout = ResourceOption.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT;
if (i == args.length - 1) {
overCommitTimeout = Integer.parseInt(args[i]);
}
exitCode = updateNodeResource(nodeID, Integer.parseInt(memSize),
Integer.parseInt(cores), overCommitTimeout);
}
} else if ("-addToClusterNodeLabels".equals(cmd)) {
if (i >= args.length) {
System.err.println(NO_LABEL_ERR_MSG);

View File

@ -204,7 +204,6 @@ public void testRefreshNodesWithGracefulTimeout() throws Exception {
when(admin.checkForDecommissioningNodes(any(
CheckForDecommissioningNodesRequest.class))).thenReturn(response);
assertEquals(0, rmAdminCLI.run(args));
// verify(admin).refreshNodes(any(RefreshNodesRequest.class));
verify(admin).refreshNodes(
RefreshNodesRequest.newInstance(DecommissionType.GRACEFUL));
@ -343,12 +342,17 @@ public void testHelp() throws Exception {
assertTrue(dataOut
.toString()
.contains(
"yarn rmadmin [-refreshQueues] [-refreshNodes [-g [timeout in seconds]]] [-refreshSuper" +
"UserGroupsConfiguration] [-refreshUserToGroupsMappings] " +
"[-refreshAdminAcls] [-refreshServiceAcl] [-getGroup" +
" [username]] [-addToClusterNodeLabels <\"label1(exclusive=true),label2(exclusive=false),label3\">]" +
" [-removeFromClusterNodeLabels <label1,label2,label3>] [-replaceLabelsOnNode " +
"<\"node1[:port]=label1,label2 node2[:port]=label1\">] [-directlyAccessNodeLabelStore]] " +
"yarn rmadmin [-refreshQueues] [-refreshNodes [-g [timeout in " +
"seconds]]] [-refreshNodesResources] [-refreshSuperUserGroups" +
"Configuration] [-refreshUserToGroupsMappings] " +
"[-refreshAdminAcls] [-refreshServiceAcl] [-getGroup " +
"[username]] [-addToClusterNodeLabels " +
"<\"label1(exclusive=true),label2(exclusive=false),label3\">] " +
"[-removeFromClusterNodeLabels <label1,label2,label3>] " +
"[-replaceLabelsOnNode " +
"<\"node1[:port]=label1,label2 node2[:port]=label1\">] " +
"[-directlyAccessNodeLabelStore]] [-updateNodeResource " +
"[NodeID] [MemSize] [vCores] ([OvercommitTimeout]) " +
"[-help [cmd]]"));
assertTrue(dataOut
.toString()
@ -360,6 +364,11 @@ public void testHelp() throws Exception {
.contains(
"-refreshNodes [-g [timeout in seconds]]: Refresh the hosts information at the " +
"ResourceManager."));
assertTrue(dataOut
.toString()
.contains(
"-refreshNodesResources: Refresh resources of NodeManagers at the " +
"ResourceManager."));
assertTrue(dataOut.toString().contains(
"-refreshUserToGroupsMappings: Refresh user-to-groups mappings"));
assertTrue(dataOut
@ -387,6 +396,8 @@ public void testHelp() throws Exception {
"Usage: yarn rmadmin [-refreshQueues]", dataErr, 0);
testError(new String[] { "-help", "-refreshNodes" },
"Usage: yarn rmadmin [-refreshNodes [-g [timeout in seconds]]]", dataErr, 0);
testError(new String[] { "-help", "-refreshNodesResources" },
"Usage: yarn rmadmin [-refreshNodesResources]", dataErr, 0);
testError(new String[] { "-help", "-refreshUserToGroupsMappings" },
"Usage: yarn rmadmin [-refreshUserToGroupsMappings]", dataErr, 0);
testError(
@ -423,13 +434,15 @@ public void testHelp() throws Exception {
assertEquals(0, rmAdminCLIWithHAEnabled.run(args));
oldOutPrintStream.println(dataOut);
String expectedHelpMsg =
"yarn rmadmin [-refreshQueues] [-refreshNodes [-g [timeout in seconds]]] [-refreshSuper"
+ "UserGroupsConfiguration] [-refreshUserToGroupsMappings] "
"yarn rmadmin [-refreshQueues] [-refreshNodes [-g [timeout in seconds]]] "
+ "[-refreshNodesResources] [-refreshSuperUserGroupsConfiguration] "
+ "[-refreshUserToGroupsMappings] "
+ "[-refreshAdminAcls] [-refreshServiceAcl] [-getGroup"
+ " [username]] [-addToClusterNodeLabels <\"label1(exclusive=true),"
+ "label2(exclusive=false),label3\">]"
+ " [-removeFromClusterNodeLabels <label1,label2,label3>] [-replaceLabelsOnNode "
+ "<\"node1[:port]=label1,label2 node2[:port]=label1\">] [-directlyAccessNodeLabelStore]] "
+ "[-updateNodeResource [NodeID] [MemSize] [vCores] ([OvercommitTimeout]) "
+ "[-transitionToActive [--forceactive] <serviceId>] "
+ "[-transitionToStandby <serviceId>] "
+ "[-getServiceState <serviceId>] [-checkHealth <serviceId>] [-help [cmd]]";

View File

@ -43,6 +43,7 @@
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RemoveFromClusterNodeLabelsRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ReplaceLabelsOnNodeRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.UpdateNodeResourceRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshNodesResourcesRequestProto;
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocolPB;
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest;
@ -54,6 +55,8 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesResponse;
@ -93,6 +96,8 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReplaceLabelsOnNodeResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.UpdateNodeResourceRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.UpdateNodeResourceResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshNodesResourcesRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshNodesResourcesResponsePBImpl;
import com.google.protobuf.ServiceException;
@ -231,6 +236,20 @@ public UpdateNodeResourceResponse updateNodeResource(
}
}
@Override
public RefreshNodesResourcesResponse refreshNodesResources(
RefreshNodesResourcesRequest request) throws YarnException, IOException {
RefreshNodesResourcesRequestProto requestProto =
((RefreshNodesResourcesRequestPBImpl)request).getProto();
try {
return new RefreshNodesResourcesResponsePBImpl(
proxy.refreshNodesResources(null, requestProto));
} catch (ServiceException e) {
RPCUtil.unwrapAndThrowException(e);
return null;
}
}
@Override
public AddToClusterNodeLabelsResponse addToClusterNodeLabels(
AddToClusterNodeLabelsRequest request) throws YarnException, IOException {

View File

@ -48,6 +48,8 @@
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ReplaceLabelsOnNodeResponseProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.UpdateNodeResourceRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.UpdateNodeResourceResponseProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshNodesResourcesRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshNodesResourcesResponseProto;
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocolPB;
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse;
@ -56,6 +58,7 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsResponse;
@ -88,6 +91,8 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReplaceLabelsOnNodeResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.UpdateNodeResourceRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.UpdateNodeResourceResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshNodesResourcesRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshNodesResourcesResponsePBImpl;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
@ -231,6 +236,23 @@ public UpdateNodeResourceResponseProto updateNodeResource(RpcController controll
}
}
@Override
public RefreshNodesResourcesResponseProto refreshNodesResources(
RpcController controller, RefreshNodesResourcesRequestProto proto)
throws ServiceException {
RefreshNodesResourcesRequestPBImpl request =
new RefreshNodesResourcesRequestPBImpl(proto);
try {
RefreshNodesResourcesResponse response =
real.refreshNodesResources(request);
return ((RefreshNodesResourcesResponsePBImpl)response).getProto();
} catch (YarnException e) {
throw new ServiceException(e);
} catch (IOException e) {
throw new ServiceException(e);
}
}
@Override
public AddToClusterNodeLabelsResponseProto addToClusterNodeLabels(
RpcController controller, AddToClusterNodeLabelsRequestProto proto)

View File

@ -0,0 +1,72 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshNodesResourcesRequestProto;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesRequest;
import com.google.protobuf.TextFormat;
@Private
@Unstable
public class RefreshNodesResourcesRequestPBImpl extends RefreshNodesResourcesRequest {
RefreshNodesResourcesRequestProto proto =
RefreshNodesResourcesRequestProto.getDefaultInstance();
RefreshNodesResourcesRequestProto.Builder builder = null;
boolean viaProto = false;
public RefreshNodesResourcesRequestPBImpl() {
builder = RefreshNodesResourcesRequestProto.newBuilder();
}
public RefreshNodesResourcesRequestPBImpl(
RefreshNodesResourcesRequestProto proto) {
this.proto = proto;
viaProto = true;
}
public RefreshNodesResourcesRequestProto getProto() {
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
@Override
public int hashCode() {
return getProto().hashCode();
}
@Override
public boolean equals(Object other) {
if (other == null)
return false;
if (other.getClass().isAssignableFrom(this.getClass())) {
return this.getProto().equals(this.getClass().cast(other).getProto());
}
return false;
}
@Override
public String toString() {
return TextFormat.shortDebugString(getProto());
}
}

View File

@ -0,0 +1,72 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshNodesResourcesResponseProto;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesResponse;
import com.google.protobuf.TextFormat;
@Private
@Unstable
public class RefreshNodesResourcesResponsePBImpl extends RefreshNodesResourcesResponse {
RefreshNodesResourcesResponseProto proto =
RefreshNodesResourcesResponseProto.getDefaultInstance();
RefreshNodesResourcesResponseProto.Builder builder = null;
boolean viaProto = false;
public RefreshNodesResourcesResponsePBImpl() {
builder = RefreshNodesResourcesResponseProto.newBuilder();
}
public RefreshNodesResourcesResponsePBImpl(
RefreshNodesResourcesResponseProto proto) {
this.proto = proto;
viaProto = true;
}
public RefreshNodesResourcesResponseProto getProto() {
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
@Override
public int hashCode() {
return getProto().hashCode();
}
@Override
public boolean equals(Object other) {
if (other == null)
return false;
if (other.getClass().isAssignableFrom(this.getClass())) {
return this.getProto().equals(this.getClass().cast(other).getProto());
}
return false;
}
@Override
public String toString() {
return TextFormat.shortDebugString(getProto());
}
}

View File

@ -220,6 +220,8 @@
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshAdminAclsRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshAdminAclsResponseProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshNodesRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshNodesResourcesRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshNodesResourcesResponseProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshNodesResponseProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshQueuesRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshQueuesResponseProto;
@ -296,6 +298,8 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshAdminAclsRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshAdminAclsResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshNodesRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshNodesResourcesRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshNodesResourcesResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshNodesResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshQueuesRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshQueuesResponsePBImpl;
@ -1126,6 +1130,18 @@ public void testRefreshQueuesResponsePBImpl() throws Exception {
RefreshQueuesResponseProto.class);
}
@Test
public void testRefreshNodesResourcesRequestPBImpl() throws Exception {
validatePBImplRecord(RefreshNodesResourcesRequestPBImpl.class,
RefreshNodesResourcesRequestProto.class);
}
@Test
public void testRefreshNodesResourcesResponsePBImpl() throws Exception {
validatePBImplRecord(RefreshNodesResourcesResponsePBImpl.class,
RefreshNodesResourcesResponseProto.class);
}
@Test
public void testRefreshServiceAclsRequestPBImpl() throws Exception {
validatePBImplRecord(RefreshServiceAclsRequestPBImpl.class,

View File

@ -72,6 +72,8 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationRequest;
@ -85,6 +87,7 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
import org.apache.hadoop.yarn.server.resourcemanager.resource.DynamicResourceConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
@ -602,6 +605,55 @@ public UpdateNodeResourceResponse updateNodeResource(
return response;
}
@Override
public RefreshNodesResourcesResponse refreshNodesResources(
RefreshNodesResourcesRequest request)
throws YarnException, StandbyException {
String argName = "refreshNodesResources";
UserGroupInformation user = checkAcls(argName);
final String msg = "refresh nodes.";
checkRMStatus(user.getShortUserName(), argName, msg);
RefreshNodesResourcesResponse response =
recordFactory.newRecordInstance(RefreshNodesResourcesResponse.class);
try {
Configuration conf = getConfig();
Configuration configuration = new Configuration(conf);
DynamicResourceConfiguration newconf;
InputStream DRInputStream =
this.rmContext.getConfigurationProvider()
.getConfigurationInputStream(configuration,
YarnConfiguration.DR_CONFIGURATION_FILE);
if (DRInputStream != null) {
configuration.addResource(DRInputStream);
newconf = new DynamicResourceConfiguration(configuration, false);
} else {
newconf = new DynamicResourceConfiguration(configuration, true);
}
if (newconf.getNodes().length == 0) {
RMAuditLogger.logSuccess(user.getShortUserName(), argName,
"AdminService");
return response;
} else {
Map<NodeId, ResourceOption> nodeResourceMap =
newconf.getNodeResourceMap();
UpdateNodeResourceRequest updateRequest =
UpdateNodeResourceRequest.newInstance(nodeResourceMap);
updateNodeResource(updateRequest);
RMAuditLogger.logSuccess(user.getShortUserName(), argName,
"AdminService");
return response;
}
} catch (IOException ioe) {
throw logAndWrapException(ioe, user.getShortUserName(), argName, msg);
}
}
private synchronized Configuration getConfiguration(Configuration conf,
String... confFileNames) throws YarnException, IOException {
for (String confFileName : confFileNames) {

View File

@ -0,0 +1,149 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.resource;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
public class DynamicResourceConfiguration extends Configuration {
private static final Log LOG =
LogFactory.getLog(DynamicResourceConfiguration.class);
private static final String DR_CONFIGURATION_FILE = "dynamic-resources.xml";
@Private
public static final String PREFIX = "yarn.resource.dynamic.";
@Private
public static final String DOT = ".";
@Private
public static final String NODES = "nodes";
@Private
public static final String VCORES = "vcores";
@Private
public static final String MEMORY = "memory";
@Private
public static final String OVERCOMMIT_TIMEOUT = "overcommittimeout";
public DynamicResourceConfiguration() {
this(new Configuration());
}
public DynamicResourceConfiguration(Configuration configuration) {
this(configuration, true);
}
public DynamicResourceConfiguration(Configuration configuration,
boolean useLocalConfigurationProvider) {
super(configuration);
if (useLocalConfigurationProvider) {
addResource(DR_CONFIGURATION_FILE);
}
}
private String getNodePrefix(String node) {
String nodeName = PREFIX + node + DOT;
return nodeName;
}
public int getVcoresPerNode(String node) {
int vcoresPerNode =
getInt(getNodePrefix(node) + VCORES,
YarnConfiguration.DEFAULT_NM_VCORES);
return vcoresPerNode;
}
public void setVcoresPerNode(String node, int vcores) {
setInt(getNodePrefix(node) + VCORES, vcores);
LOG.debug("DRConf - setVcoresPerNode: nodePrefix=" + getNodePrefix(node) +
", vcores=" + vcores);
}
public int getMemoryPerNode(String node) {
int memoryPerNode =
getInt(getNodePrefix(node) + MEMORY,
YarnConfiguration.DEFAULT_NM_PMEM_MB);
return memoryPerNode;
}
public void setMemoryPerNode(String node, int memory) {
setInt(getNodePrefix(node) + MEMORY, memory);
LOG.debug("DRConf - setMemoryPerNode: nodePrefix=" + getNodePrefix(node) +
", memory=" + memory);
}
public int getOverCommitTimeoutPerNode(String node) {
int overCommitTimeoutPerNode =
getInt(getNodePrefix(node) + OVERCOMMIT_TIMEOUT,
ResourceOption.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT);
return overCommitTimeoutPerNode;
}
public void setOverCommitTimeoutPerNode(String node, int overCommitTimeout) {
setInt(getNodePrefix(node) + OVERCOMMIT_TIMEOUT, overCommitTimeout);
LOG.debug("DRConf - setOverCommitTimeoutPerNode: nodePrefix=" +
getNodePrefix(node) +
", overCommitTimeout=" + overCommitTimeout);
}
public String[] getNodes() {
String[] nodes = getStrings(PREFIX + NODES);
return nodes;
}
public void setNodes(String[] nodes) {
set(PREFIX + NODES, StringUtils.arrayToString(nodes));
}
public Map<NodeId, ResourceOption> getNodeResourceMap() {
String[] nodes = getNodes();
Map<NodeId, ResourceOption> resourceOptions
= new HashMap<NodeId, ResourceOption> ();
for (String node : nodes) {
NodeId nid = ConverterUtils.toNodeId(node);
int vcores = getVcoresPerNode(node);
int memory = getMemoryPerNode(node);
int overCommitTimeout = getOverCommitTimeoutPerNode(node);
Resource resource = Resources.createResource(memory, vcores);
ResourceOption resourceOption =
ResourceOption.newInstance(resource, overCommitTimeout);
resourceOptions.put(nid, resourceOption);
}
return resourceOptions;
}
}

View File

@ -37,9 +37,6 @@
*/
public interface RMNode {
/** negative value means no timeout */
public static final int OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT = -1;
/**
* the node id of of this node.
* @return the node id of this node.

View File

@ -58,11 +58,17 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.resource.DynamicResourceConfiguration;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -82,6 +88,8 @@ public class TestRMAdminService {
static {
YarnConfiguration.addDefaultResource(
YarnConfiguration.CS_CONFIGURATION_FILE);
YarnConfiguration.addDefaultResource(
YarnConfiguration.DR_CONFIGURATION_FILE);
}
@Before
@ -168,6 +176,44 @@ public void testAdminRefreshQueuesWithFileSystemBasedConfigurationProvider()
Assert.assertTrue(maxAppsAfter != maxAppsBefore);
}
@Test
public void testAdminRefreshNodesResourcesWithFileSystemBasedConfigurationProvider()
throws IOException, YarnException {
configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
"org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider");
//upload default configurations
uploadDefaultConfiguration();
try {
rm = new MockRM(configuration);
rm.init(configuration);
rm.start();
rm.registerNode("h1:1234", 5120);
} catch(Exception ex) {
fail("Should not get any exceptions");
}
NodeId nid = ConverterUtils.toNodeId("h1:1234");
RMNode ni = rm.getRMContext().getRMNodes().get(nid);
Resource resource = ni.getTotalCapability();
Assert.assertEquals("<memory:5120, vCores:5>", resource.toString());
DynamicResourceConfiguration drConf =
new DynamicResourceConfiguration();
drConf.set("yarn.resource.dynamic.nodes", "h1:1234");
drConf.set("yarn.resource.dynamic.h1:1234.vcores", "4");
drConf.set("yarn.resource.dynamic.h1:1234.memory", "4096");
uploadConfiguration(drConf, "dynamic-resources.xml");
rm.adminService.refreshNodesResources(
RefreshNodesResourcesRequest.newInstance());
RMNode niAfter = rm.getRMContext().getRMNodes().get(nid);
Resource resourceAfter = niAfter.getTotalCapability();
Assert.assertEquals("<memory:4096, vCores:4>", resourceAfter.toString());
}
@Test
public void testAdminAclsWithLocalConfigurationProvider() {
rm = new MockRM(configuration);

View File

@ -659,7 +659,7 @@ public void testResourceUpdateOnRunningNode() {
assertEquals("CPU resource is not match.", oldCapacity.getVirtualCores(), 4);
node.handle(new RMNodeResourceUpdateEvent(node.getNodeID(),
ResourceOption.newInstance(Resource.newInstance(2048, 2),
RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT)));
ResourceOption.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT)));
Resource newCapacity = node.getTotalCapability();
assertEquals("Memory resource is not match.", newCapacity.getMemory(), 2048);
assertEquals("CPU resource is not match.", newCapacity.getVirtualCores(), 2);
@ -678,7 +678,7 @@ public void testResourceUpdateOnNewNode() {
assertEquals("CPU resource is not match.", oldCapacity.getVirtualCores(), 4);
node.handle(new RMNodeResourceUpdateEvent(node.getNodeID(),
ResourceOption.newInstance(Resource.newInstance(2048, 2),
RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT)));
ResourceOption.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT)));
Resource newCapacity = node.getTotalCapability();
assertEquals("Memory resource is not match.", newCapacity.getMemory(), 2048);
assertEquals("CPU resource is not match.", newCapacity.getVirtualCores(), 2);
@ -694,7 +694,7 @@ public void testResourceUpdateOnRebootedNode() {
assertEquals("CPU resource is not match.", oldCapacity.getVirtualCores(), 4);
node.handle(new RMNodeResourceUpdateEvent(node.getNodeID(),
ResourceOption.newInstance(Resource.newInstance(2048, 2),
RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT)));
ResourceOption.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT)));
Resource newCapacity = node.getTotalCapability();
assertEquals("Memory resource is not match.", newCapacity.getMemory(), 2048);
assertEquals("CPU resource is not match.", newCapacity.getVirtualCores(), 2);

View File

@ -327,7 +327,7 @@ public Map<NodeId, FiCaSchedulerNode> getNodes(){
NodeResourceUpdateSchedulerEvent node0ResourceUpdate = new
NodeResourceUpdateSchedulerEvent(node0, ResourceOption.newInstance(
newResource, RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT));
newResource, ResourceOption.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT));
scheduler.handle(node0ResourceUpdate);
// SchedulerNode's total resource and available resource are changed.