YARN-5434. Add -client|server argument for graceful decommmission. Contributed by Robert Kanter.

This commit is contained in:
Junping Du 2016-07-29 10:26:11 -07:00
parent 204a2055b1
commit 95f2b98597
2 changed files with 115 additions and 23 deletions

View File

@ -98,11 +98,17 @@ public class RMAdminCLI extends HAAdmin {
"Reload the queues' acls, states and scheduler specific " +
"properties. \n\t\tResourceManager will reload the " +
"mapred-queues configuration file."))
.put("-refreshNodes", new UsageInfo("[-g [timeout in seconds]]",
.put("-refreshNodes",
new UsageInfo("[-g [timeout in seconds] -client|server]",
"Refresh the hosts information at the ResourceManager. Here "
+ "[-g [timeout in seconds] is optional, if we specify the "
+ "timeout then ResourceManager will wait for timeout before "
+ "marking the NodeManager as decommissioned."))
+ "[-g [timeout in seconds] -client|server] is optional, if we "
+ "specify the timeout then ResourceManager will wait for "
+ "timeout before marking the NodeManager as decommissioned."
+ " The -client|server indicates if the timeout tracking should"
+ " be handled by the client or the ResourceManager. The client"
+ "-side tracking is blocking, while the server-side tracking"
+ " is not. Omitting the timeout, or a timeout of -1, indicates"
+ " an infinite timeout."))
.put("-refreshNodesResources", new UsageInfo("",
"Refresh resources of NodeManagers at the ResourceManager."))
.put("-refreshSuperUserGroupsConfiguration", new UsageInfo("",
@ -230,7 +236,7 @@ private static void printHelp(String cmd, boolean isHAEnabled) {
summary.append("The full syntax is: \n\n" +
"yarn rmadmin" +
" [-refreshQueues]" +
" [-refreshNodes [-g [timeout in seconds]]]" +
" [-refreshNodes [-g [timeout in seconds] -client|server]]" +
" [-refreshNodesResources]" +
" [-refreshSuperUserGroupsConfiguration]" +
" [-refreshUserToGroupsMappings]" +
@ -312,7 +318,12 @@ private int refreshNodes() throws IOException, YarnException {
return 0;
}
private int refreshNodes(long timeout) throws IOException, YarnException {
private int refreshNodes(long timeout, String trackingMode)
throws IOException, YarnException {
if (!"client".equals(trackingMode)) {
throw new UnsupportedOperationException(
"Only client tracking mode is currently supported.");
}
// Graceful decommissioning with timeout
ResourceManagerAdministrationProtocol adminProtocol = createAdminProtocol();
RefreshNodesRequest gracefulRequest = RefreshNodesRequest
@ -721,11 +732,18 @@ public int run(String[] args) throws Exception {
} else if ("-refreshNodes".equals(cmd)) {
if (args.length == 1) {
exitCode = refreshNodes();
} else if (args.length == 3) {
} else if (args.length == 3 || args.length == 4) {
// if the graceful timeout specified
if ("-g".equals(args[1])) {
long timeout = validateTimeout(args[2]);
exitCode = refreshNodes(timeout);
long timeout = -1;
String trackingMode;
if (args.length == 4) {
timeout = validateTimeout(args[2]);
trackingMode = validateTrackingMode(args[3]);
} else {
trackingMode = validateTrackingMode(args[2]);
}
exitCode = refreshNodes(timeout, trackingMode);
} else {
printUsage(cmd, isHAEnabled);
return -1;
@ -838,6 +856,16 @@ private long validateTimeout(String strTimeout) {
return timeout;
}
private String validateTrackingMode(String mode) {
if ("-client".equals(mode)) {
return "client";
}
if ("-server".equals(mode)) {
return "server";
}
throw new IllegalArgumentException("Invalid mode specified: " + mode);
}
@Override
public void setConf(Configuration conf) {
if (conf != null) {

View File

@ -26,6 +26,7 @@
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
@ -255,9 +256,9 @@ public void testRefreshNodes() throws Exception {
}
@Test
public void testRefreshNodesWithGracefulTimeout() throws Exception {
public void testRefreshNodesGracefulBeforeTimeout() throws Exception {
// graceful decommission before timeout
String[] args = { "-refreshNodes", "-g", "1" };
String[] args = {"-refreshNodes", "-g", "1", "-client"};
CheckForDecommissioningNodesResponse response = Records
.newRecord(CheckForDecommissioningNodesResponse.class);
HashSet<NodeId> decomNodes = new HashSet<NodeId>();
@ -267,30 +268,91 @@ public void testRefreshNodesWithGracefulTimeout() throws Exception {
assertEquals(0, rmAdminCLI.run(args));
verify(admin).refreshNodes(
RefreshNodesRequest.newInstance(DecommissionType.GRACEFUL));
verify(admin, never()).refreshNodes(
RefreshNodesRequest.newInstance(DecommissionType.FORCEFUL));
}
@Test
public void testRefreshNodesGracefulHitTimeout() throws Exception {
// Forceful decommission when timeout occurs
String[] focefulDecomArgs = { "-refreshNodes", "-g", "1" };
decomNodes = new HashSet<NodeId>();
String[] forcefulDecomArgs = {"-refreshNodes", "-g", "1", "-client"};
HashSet<NodeId> decomNodes = new HashSet<NodeId>();
CheckForDecommissioningNodesResponse response = Records
.newRecord(CheckForDecommissioningNodesResponse.class);
response.setDecommissioningNodes(decomNodes);
decomNodes.add(NodeId.newInstance("node1", 100));
response.setDecommissioningNodes(decomNodes);
when(admin.checkForDecommissioningNodes(any(
CheckForDecommissioningNodesRequest.class))).thenReturn(response);
assertEquals(0, rmAdminCLI.run(focefulDecomArgs));
assertEquals(0, rmAdminCLI.run(forcefulDecomArgs));
verify(admin).refreshNodes(
RefreshNodesRequest.newInstance(DecommissionType.FORCEFUL));
}
@Test
public void testRefreshNodesGracefulInfiniteTimeout() throws Exception {
String[] infiniteTimeoutArgs = {"-refreshNodes", "-g", "-1", "-client"};
testRefreshNodesGracefulInfiniteTimeout(infiniteTimeoutArgs);
}
@Test
public void testRefreshNodesGracefulNoTimeout() throws Exception {
// no timeout (infinite timeout)
String[] noTimeoutArgs = {"-refreshNodes", "-g", "-client"};
testRefreshNodesGracefulInfiniteTimeout(noTimeoutArgs);
}
private void testRefreshNodesGracefulInfiniteTimeout(String[] args)
throws Exception {
when(admin.checkForDecommissioningNodes(any(
CheckForDecommissioningNodesRequest.class))).thenAnswer(
new Answer<CheckForDecommissioningNodesResponse>() {
private int count = 5;
@Override
public CheckForDecommissioningNodesResponse answer(
InvocationOnMock invocationOnMock) throws Throwable {
CheckForDecommissioningNodesResponse response = Records
.newRecord(CheckForDecommissioningNodesResponse.class);
HashSet<NodeId> decomNodes = new HashSet<NodeId>();
count--;
if (count <= 0) {
response.setDecommissioningNodes(decomNodes);
return response;
} else {
decomNodes.add(NodeId.newInstance("node1", 100));
response.setDecommissioningNodes(decomNodes);
return response;
}
}
});
assertEquals(0, rmAdminCLI.run(args));
verify(admin, atLeastOnce()).refreshNodes(
RefreshNodesRequest.newInstance(DecommissionType.GRACEFUL));
verify(admin, never()).refreshNodes(
RefreshNodesRequest.newInstance(DecommissionType.FORCEFUL));
}
@Test
public void testRefreshNodesGracefulInvalidArgs() throws Exception {
// invalid graceful timeout parameter
String[] invalidArgs = { "-refreshNodes", "-ginvalid", "invalid" };
String[] invalidArgs = {"-refreshNodes", "-ginvalid", "invalid", "-client"};
assertEquals(-1, rmAdminCLI.run(invalidArgs));
// invalid timeout
String[] invalidTimeoutArgs = { "-refreshNodes", "-g", "invalid" };
String[] invalidTimeoutArgs = {"-refreshNodes", "-g", "invalid", "-client"};
assertEquals(-1, rmAdminCLI.run(invalidTimeoutArgs));
// negative timeout
String[] negativeTimeoutArgs = { "-refreshNodes", "-g", "-1000" };
String[] negativeTimeoutArgs = {"-refreshNodes", "-g", "-1000", "-client"};
assertEquals(-1, rmAdminCLI.run(negativeTimeoutArgs));
// server tracking mode
String[] serveTrackingrArgs = {"-refreshNodes", "-g", "1", "-server"};
assertEquals(-1, rmAdminCLI.run(serveTrackingrArgs));
// invalid tracking mode
String[] invalidTrackingArgs = {"-refreshNodes", "-g", "1", "-foo"};
assertEquals(-1, rmAdminCLI.run(invalidTrackingArgs));
}
@Test(timeout=500)
@ -404,8 +466,8 @@ public void testHelp() throws Exception {
.toString()
.contains(
"yarn rmadmin [-refreshQueues] [-refreshNodes [-g [timeout in " +
"seconds]]] [-refreshNodesResources] [-refreshSuperUserGroups" +
"Configuration] [-refreshUserToGroupsMappings] " +
"seconds] -client|server]] [-refreshNodesResources] [-refresh" +
"SuperUserGroupsConfiguration] [-refreshUserToGroupsMappings] " +
"[-refreshAdminAcls] [-refreshServiceAcl] [-getGroup " +
"[username]] [-addToClusterNodeLabels " +
"<\"label1(exclusive=true),label2(exclusive=false),label3\">] " +
@ -423,8 +485,8 @@ public void testHelp() throws Exception {
assertTrue(dataOut
.toString()
.contains(
"-refreshNodes [-g [timeout in seconds]]: Refresh the hosts information at the " +
"ResourceManager."));
"-refreshNodes [-g [timeout in seconds] -client|server]: " +
"Refresh the hosts information at the ResourceManager."));
assertTrue(dataOut
.toString()
.contains(
@ -456,7 +518,8 @@ public void testHelp() throws Exception {
testError(new String[] { "-help", "-refreshQueues" },
"Usage: yarn rmadmin [-refreshQueues]", dataErr, 0);
testError(new String[] { "-help", "-refreshNodes" },
"Usage: yarn rmadmin [-refreshNodes [-g [timeout in seconds]]]", dataErr, 0);
"Usage: yarn rmadmin [-refreshNodes [-g [timeout in seconds] " +
"-client|server]]", dataErr, 0);
testError(new String[] { "-help", "-refreshNodesResources" },
"Usage: yarn rmadmin [-refreshNodesResources]", dataErr, 0);
testError(new String[] { "-help", "-refreshUserToGroupsMappings" },
@ -495,7 +558,8 @@ public void testHelp() throws Exception {
assertEquals(0, rmAdminCLIWithHAEnabled.run(args));
oldOutPrintStream.println(dataOut);
String expectedHelpMsg =
"yarn rmadmin [-refreshQueues] [-refreshNodes [-g [timeout in seconds]]] "
"yarn rmadmin [-refreshQueues] [-refreshNodes [-g [timeout in "
+ "seconds] -client|server]] "
+ "[-refreshNodesResources] [-refreshSuperUserGroupsConfiguration] "
+ "[-refreshUserToGroupsMappings] "
+ "[-refreshAdminAcls] [-refreshServiceAcl] [-getGroup"