HDFS-10403. DiskBalancer: Add cancel command. Contributed by Anu Engineer.

This commit is contained in:
Anu Engineer 2016-05-23 14:51:35 -07:00 committed by Arpit Agarwal
parent 9e5fcb5e40
commit 43eee50966
4 changed files with 177 additions and 14 deletions
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs

@ -0,0 +1,137 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.hadoop.hdfs.server.diskbalancer.command;
import com.google.common.base.Preconditions;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.server.diskbalancer.DiskBalancerException;
import org.apache.hadoop.hdfs.server.diskbalancer.planner.NodePlan;
import org.apache.hadoop.hdfs.tools.DiskBalancer;
import java.io.IOException;
/**
* Cancels a running plan.
*/
public class CancelCommand extends Command {
/**
* Contructs a cancel Command.
*
* @param conf - Conf
*/
public CancelCommand(Configuration conf) {
super(conf);
addValidCommandParameters(DiskBalancer.CANCEL, "Cancels a running plan.");
addValidCommandParameters(DiskBalancer.NODE, "Node to run the command " +
"against in node:port format.");
}
/**
* Executes the Client Calls.
*
* @param cmd - CommandLine
*/
@Override
public void execute(CommandLine cmd) throws Exception {
LOG.info("Executing \"Cancel plan\" command.");
Preconditions.checkState(cmd.hasOption(DiskBalancer.CANCEL));
verifyCommandOptions(DiskBalancer.CANCEL, cmd);
// We can cancel a plan using datanode address and plan ID
// that you can read from a datanode using queryStatus
if(cmd.hasOption(DiskBalancer.NODE)) {
String nodeAddress = cmd.getOptionValue(DiskBalancer.NODE);
String planHash = cmd.getOptionValue(DiskBalancer.CANCEL);
cancelPlanUsingHash(nodeAddress, planHash);
} else {
// Or you can cancel a plan using the plan file. If the user
// points us to the plan file, we can compute the hash as well as read
// the address of the datanode from the plan file.
String planFile = cmd.getOptionValue(DiskBalancer.CANCEL);
Preconditions.checkArgument(planFile == null || planFile.isEmpty(),
"Invalid plan file specified.");
String planData = null;
try (FSDataInputStream plan = open(planFile)) {
planData = IOUtils.toString(plan);
}
cancelPlan(planData);
}
}
/**
* Cancels a running plan.
*
* @param planData - Plan data.
* @throws IOException
*/
private void cancelPlan(String planData) throws IOException {
Preconditions.checkNotNull(planData);
NodePlan plan = readPlan(planData);
String dataNodeAddress = plan.getNodeName() + ":" + plan.getPort();
Preconditions.checkNotNull(dataNodeAddress);
ClientDatanodeProtocol dataNode = getDataNodeProxy(dataNodeAddress);
String planHash = DigestUtils.sha512Hex(planData);
try {
dataNode.cancelDiskBalancePlan(planHash);
} catch (DiskBalancerException ex) {
LOG.error("Cancelling plan on {} failed. Result: {}, Message: {}",
plan.getNodeName(), ex.getResult().toString(), ex.getMessage());
throw ex;
}
}
/**
* Cancels a running plan.
* @param nodeAddress - Address of the data node.
* @param hash - Sha512 hash of the plan, which can be read from datanode
* using query status command.
* @throws IOException
*/
private void cancelPlanUsingHash(String nodeAddress, String hash) throws
IOException {
Preconditions.checkNotNull(nodeAddress);
Preconditions.checkNotNull(hash);
ClientDatanodeProtocol dataNode = getDataNodeProxy(nodeAddress);
try {
dataNode.cancelDiskBalancePlan(hash);
} catch (DiskBalancerException ex) {
LOG.error("Cancelling plan on {} failed. Result: {}, Message: {}",
nodeAddress, ex.getResult().toString(), ex.getMessage());
throw ex;
}
}
/**
* Gets extended help for this command.
*
* @return Help Message
*/
@Override
protected String getHelp() {
return "Cancels a running command. e.g -cancel <PlanFile> or -cancel " +
"<planID> -node <datanode>";
}
}

@ -31,6 +31,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.server.diskbalancer.planner.NodePlan;
import org.apache.hadoop.hdfs.tools.DiskBalancer;
import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ClusterConnector;
import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ConnectorFactory;
@ -39,6 +40,7 @@ import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerDataNode
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.htrace.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -392,4 +394,16 @@ public abstract class Command extends Configured {
protected DiskBalancerCluster getCluster() {
return cluster;
}
/**
* Returns a plan from the Json Data.
*
* @param planData - Json String
* @return NodePlan
* @throws IOException
*/
protected NodePlan readPlan(String planData) throws IOException {
ObjectMapper mapper = new ObjectMapper();
return mapper.readValue(planData, NodePlan.class);
}
}

@ -27,9 +27,9 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.server.diskbalancer.DiskBalancerException;
import org.apache.hadoop.hdfs.tools.DiskBalancer;
import org.apache.hadoop.hdfs.server.diskbalancer.planner.NodePlan;
import org.apache.htrace.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hadoop.hdfs.tools.DiskBalancer;
import java.io.IOException;
@ -94,17 +94,7 @@ public class ExecuteCommand extends Command {
}
}
/**
* Returns a plan from the Json Data.
*
* @param planData - Json String
* @return NodePlan
* @throws IOException
*/
private NodePlan readPlan(String planData) throws IOException {
ObjectMapper mapper = new ObjectMapper();
return mapper.readValue(planData, NodePlan.class);
}
/**
* Gets extended help for this command.

@ -24,6 +24,7 @@ import org.apache.commons.cli.Options;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.diskbalancer.command.CancelCommand;
import org.apache.hadoop.hdfs.server.diskbalancer.command.Command;
import org.apache.hadoop.hdfs.server.diskbalancer.command.ExecuteCommand;
import org.apache.hadoop.hdfs.server.diskbalancer.command.PlanCommand;
@ -105,7 +106,10 @@ public class DiskBalancer extends Configured implements Tool {
* Reports the status of disk balancer operation.
*/
public static final String QUERY = "query";
/**
* Cancels a running plan.
*/
public static final String CANCEL = "cancel";
/**
* Template for the Before File. It is node.before.json.
*/
@ -168,6 +172,7 @@ public class DiskBalancer extends Configured implements Tool {
addPlanCommands(opts);
addExecuteCommands(opts);
addQueryCommands(opts);
addCancelCommands(opts);
return opts;
}
@ -233,6 +238,19 @@ public class DiskBalancer extends Configured implements Tool {
opt.addOption(query);
}
/**
* Adds cancel command options.
* @param opt Options
*/
private void addCancelCommands(Options opt) {
Option cancel = new Option(CANCEL, true, "Cancels a running plan. -cancel" +
" <planFile> or -cancel <planID> -node <datanode:port>");
opt.addOption(cancel);
Option node = new Option(NODE, true, "Name of the datanode in name:port " +
"format");
opt.addOption(node);
}
/**
* This function parses all command line arguments and returns the appropriate
* values.
@ -271,6 +289,10 @@ public class DiskBalancer extends Configured implements Tool {
currentCommand = new QueryCommand(getConf());
}
if(cmd.hasOption(DiskBalancer.CANCEL)) {
currentCommand = new CancelCommand(getConf());
}
if(currentCommand == null) {
HelpFormatter helpFormatter = new HelpFormatter();
helpFormatter.printHelp(80, "hdfs diskbalancer -uri [args]",