HDFS-13181. DiskBalancer: Add an configuration for valid plan hours . Contributed by Bharat Viswanadham.
This commit is contained in:
parent
137f0d324a
commit
1cc9a58dda
@ -1157,6 +1157,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||
"dfs.disk.balancer.max.disk.errors";
|
||||
public static final int DFS_DISK_BALANCER_MAX_DISK_ERRORS_DEFAULT = 5;
|
||||
|
||||
public static final String DFS_DISK_BALANCER_PLAN_VALID_INTERVAL =
|
||||
"dfs.disk.balancer.plan.valid.interval";
|
||||
public static final String DFS_DISK_BALANCER_PLAN_VALID_INTERVAL_DEFAULT =
|
||||
"1d";
|
||||
|
||||
|
||||
public static final String DFS_DISK_BALANCER_BLOCK_TOLERANCE =
|
||||
"dfs.disk.balancer.block.tolerance.percent";
|
||||
|
@ -91,6 +91,8 @@ public class DiskBalancer {
|
||||
private String planFile;
|
||||
private DiskBalancerWorkStatus.Result currentResult;
|
||||
private long bandwidth;
|
||||
private long planValidityInterval;
|
||||
private final Configuration config;
|
||||
|
||||
/**
|
||||
* Constructs a Disk Balancer object. This object takes care of reading a
|
||||
@ -102,6 +104,7 @@ public class DiskBalancer {
|
||||
*/
|
||||
public DiskBalancer(String dataNodeUUID,
|
||||
Configuration conf, BlockMover blockMover) {
|
||||
this.config = conf;
|
||||
this.currentResult = Result.NO_PLAN;
|
||||
this.blockMover = blockMover;
|
||||
this.dataset = this.blockMover.getDataset();
|
||||
@ -117,6 +120,10 @@ public class DiskBalancer {
|
||||
this.bandwidth = conf.getInt(
|
||||
DFSConfigKeys.DFS_DISK_BALANCER_MAX_DISK_THROUGHPUT,
|
||||
DFSConfigKeys.DFS_DISK_BALANCER_MAX_DISK_THROUGHPUT_DEFAULT);
|
||||
this.planValidityInterval = conf.getTimeDuration(
|
||||
DFSConfigKeys.DFS_DISK_BALANCER_PLAN_VALID_INTERVAL,
|
||||
DFSConfigKeys.DFS_DISK_BALANCER_PLAN_VALID_INTERVAL_DEFAULT,
|
||||
TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -417,15 +424,17 @@ public class DiskBalancer {
|
||||
long now = Time.now();
|
||||
long planTime = plan.getTimeStamp();
|
||||
|
||||
// TODO : Support Valid Plan hours as a user configurable option.
|
||||
if ((planTime +
|
||||
(TimeUnit.HOURS.toMillis(
|
||||
DiskBalancerConstants.DISKBALANCER_VALID_PLAN_HOURS))) < now) {
|
||||
String hourString = "Plan was generated more than " +
|
||||
Integer.toString(DiskBalancerConstants.DISKBALANCER_VALID_PLAN_HOURS)
|
||||
+ " hours ago.";
|
||||
LOG.error("Disk Balancer - " + hourString);
|
||||
throw new DiskBalancerException(hourString,
|
||||
if ((planTime + planValidityInterval) < now) {
|
||||
String planValidity = config.get(
|
||||
DFSConfigKeys.DFS_DISK_BALANCER_PLAN_VALID_INTERVAL,
|
||||
DFSConfigKeys.DFS_DISK_BALANCER_PLAN_VALID_INTERVAL_DEFAULT);
|
||||
if (planValidity.matches("[0-9]$")) {
|
||||
planValidity += "ms";
|
||||
}
|
||||
String errorString = "Plan was generated more than " + planValidity
|
||||
+ " ago";
|
||||
LOG.error("Disk Balancer - " + errorString);
|
||||
throw new DiskBalancerException(errorString,
|
||||
DiskBalancerException.Result.OLD_PLAN_SUBMITTED);
|
||||
}
|
||||
}
|
||||
|
@ -33,11 +33,6 @@ public final class DiskBalancerConstants {
|
||||
public static final int DISKBALANCER_MIN_VERSION = 1;
|
||||
public static final int DISKBALANCER_MAX_VERSION = 1;
|
||||
|
||||
/**
|
||||
* We treat a plan as stale if it was generated before the hours
|
||||
* defined by the constant below. Defaults to 24 hours.
|
||||
*/
|
||||
public static final int DISKBALANCER_VALID_PLAN_HOURS = 24;
|
||||
// never constructed.
|
||||
private DiskBalancerConstants() {
|
||||
}
|
||||
|
@ -4630,6 +4630,17 @@
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.disk.balancer.plan.valid.interval</name>
|
||||
<value>1d</value>
|
||||
<description>
|
||||
Maximum number of hours the disk balancer plan is valid.
|
||||
This setting supports multiple time unit suffixes as described
|
||||
in dfs.heartbeat.interval. If no suffix is specified then milliseconds
|
||||
is assumed.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
|
||||
<property>
|
||||
<name>dfs.disk.balancer.enabled</name>
|
||||
|
@ -18,6 +18,7 @@
|
||||
package org.apache.hadoop.hdfs.server.diskbalancer.command;
|
||||
|
||||
|
||||
import static java.lang.Thread.sleep;
|
||||
import static org.apache.hadoop.hdfs.tools.DiskBalancerCLI.CANCEL;
|
||||
import static org.apache.hadoop.hdfs.tools.DiskBalancerCLI.EXECUTE;
|
||||
import static org.apache.hadoop.hdfs.tools.DiskBalancerCLI.HELP;
|
||||
@ -58,6 +59,7 @@ import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerDataNode
|
||||
import org.apache.hadoop.hdfs.tools.DiskBalancerCLI;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.test.LambdaTestUtils;
|
||||
import org.apache.hadoop.test.PathUtils;
|
||||
import org.apache.hadoop.util.Tool;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
@ -187,6 +189,132 @@ public class TestDiskBalancerCommand {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Test(timeout = 600000)
|
||||
public void testDiskBalancerExecuteOptionPlanValidityWithException() throws
|
||||
Exception {
|
||||
final int numDatanodes = 1;
|
||||
|
||||
final Configuration hdfsConf = new HdfsConfiguration();
|
||||
hdfsConf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
|
||||
hdfsConf.set(DFSConfigKeys.DFS_DISK_BALANCER_PLAN_VALID_INTERVAL, "0d");
|
||||
|
||||
/* new cluster with imbalanced capacity */
|
||||
final MiniDFSCluster miniCluster = DiskBalancerTestUtil.
|
||||
newImbalancedCluster(
|
||||
hdfsConf,
|
||||
numDatanodes,
|
||||
CAPACITIES,
|
||||
DEFAULT_BLOCK_SIZE,
|
||||
FILE_LEN);
|
||||
|
||||
try {
|
||||
/* get full path of plan */
|
||||
final String planFileFullName = runAndVerifyPlan(miniCluster, hdfsConf);
|
||||
|
||||
/* run execute command */
|
||||
final String cmdLine = String.format(
|
||||
"hdfs diskbalancer -%s %s",
|
||||
EXECUTE,
|
||||
planFileFullName);
|
||||
|
||||
LambdaTestUtils.intercept(
|
||||
RemoteException.class,
|
||||
"DiskBalancerException",
|
||||
"Plan was generated more than 0d ago",
|
||||
() -> {
|
||||
runCommand(cmdLine, hdfsConf, miniCluster);
|
||||
});
|
||||
} finally{
|
||||
if (miniCluster != null) {
|
||||
miniCluster.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 600000)
|
||||
public void testDiskBalancerExecutePlanValidityWithOutUnitException()
|
||||
throws
|
||||
Exception {
|
||||
final int numDatanodes = 1;
|
||||
|
||||
final Configuration hdfsConf = new HdfsConfiguration();
|
||||
hdfsConf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
|
||||
hdfsConf.set(DFSConfigKeys.DFS_DISK_BALANCER_PLAN_VALID_INTERVAL, "0");
|
||||
|
||||
/* new cluster with imbalanced capacity */
|
||||
final MiniDFSCluster miniCluster = DiskBalancerTestUtil.
|
||||
newImbalancedCluster(
|
||||
hdfsConf,
|
||||
numDatanodes,
|
||||
CAPACITIES,
|
||||
DEFAULT_BLOCK_SIZE,
|
||||
FILE_LEN);
|
||||
|
||||
try {
|
||||
/* get full path of plan */
|
||||
final String planFileFullName = runAndVerifyPlan(miniCluster, hdfsConf);
|
||||
|
||||
/* run execute command */
|
||||
final String cmdLine = String.format(
|
||||
"hdfs diskbalancer -%s %s",
|
||||
EXECUTE,
|
||||
planFileFullName);
|
||||
|
||||
LambdaTestUtils.intercept(
|
||||
RemoteException.class,
|
||||
"DiskBalancerException",
|
||||
"Plan was generated more than 0ms ago",
|
||||
() -> {
|
||||
runCommand(cmdLine, hdfsConf, miniCluster);
|
||||
});
|
||||
} finally{
|
||||
if (miniCluster != null) {
|
||||
miniCluster.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Test(timeout = 600000)
|
||||
public void testDiskBalancerExecuteOptionPlanValidity() throws Exception {
|
||||
final int numDatanodes = 1;
|
||||
|
||||
final Configuration hdfsConf = new HdfsConfiguration();
|
||||
hdfsConf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
|
||||
hdfsConf.set(DFSConfigKeys.DFS_DISK_BALANCER_PLAN_VALID_INTERVAL, "600s");
|
||||
|
||||
/* new cluster with imbalanced capacity */
|
||||
final MiniDFSCluster miniCluster = DiskBalancerTestUtil.
|
||||
newImbalancedCluster(
|
||||
hdfsConf,
|
||||
numDatanodes,
|
||||
CAPACITIES,
|
||||
DEFAULT_BLOCK_SIZE,
|
||||
FILE_LEN);
|
||||
|
||||
try {
|
||||
/* get full path of plan */
|
||||
final String planFileFullName = runAndVerifyPlan(miniCluster, hdfsConf);
|
||||
|
||||
/* run execute command */
|
||||
final String cmdLine = String.format(
|
||||
"hdfs diskbalancer -%s %s",
|
||||
EXECUTE,
|
||||
planFileFullName);
|
||||
|
||||
// Plan is valid for 600 seconds, sleeping for 10seconds, so now
|
||||
// diskbalancer should execute the plan
|
||||
sleep(10000);
|
||||
runCommand(cmdLine, hdfsConf, miniCluster);
|
||||
} finally{
|
||||
if (miniCluster != null) {
|
||||
miniCluster.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private String runAndVerifyPlan(
|
||||
final MiniDFSCluster miniCluster,
|
||||
final Configuration hdfsConf) throws Exception {
|
||||
|
Loading…
x
Reference in New Issue
Block a user