diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
index d1bc1f15b8..972f0fce00 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
@@ -40,7 +40,11 @@
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
/**
@@ -106,6 +110,7 @@ public void shutdown() {
this.isDiskBalancerEnabled = false;
this.currentResult = Result.NO_PLAN;
if ((this.future != null) && (!this.future.isDone())) {
+ this.currentResult = Result.PLAN_CANCELLED;
this.blockMover.setExitFlag();
shutdownExecutor();
}
@@ -120,9 +125,9 @@ public void shutdown() {
private void shutdownExecutor() {
scheduler.shutdown();
try {
- if(!scheduler.awaitTermination(30, TimeUnit.SECONDS)) {
+ if(!scheduler.awaitTermination(10, TimeUnit.SECONDS)) {
scheduler.shutdownNow();
- if (!scheduler.awaitTermination(30, TimeUnit.SECONDS)) {
+ if (!scheduler.awaitTermination(10, TimeUnit.SECONDS)) {
LOG.error("Disk Balancer : Scheduler did not terminate.");
}
}
@@ -218,6 +223,7 @@ public void cancelPlan(String planID) throws DiskBalancerException {
if (!this.future.isDone()) {
this.blockMover.setExitFlag();
shutdownExecutor();
+ this.currentResult = Result.PLAN_CANCELLED;
}
} finally {
lock.unlock();
@@ -537,7 +543,7 @@ public interface BlockMover {
/**
* Holds references to actual volumes that we will be operating against.
*/
- static class VolumePair {
+ public static class VolumePair {
private final FsVolumeSpi source;
private final FsVolumeSpi dest;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerResultVerifier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerResultVerifier.java
new file mode 100644
index 0000000000..5abb33cc39
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerResultVerifier.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdfs.server.diskbalancer;
+
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeMatcher;
+
+public class DiskBalancerResultVerifier
+ extends TypeSafeMatcher {
+ private final DiskBalancerException.Result expectedResult;
+
+ DiskBalancerResultVerifier(DiskBalancerException.Result expectedResult) {
+ this.expectedResult = expectedResult;
+ }
+
+ @Override
+ protected boolean matchesSafely(DiskBalancerException exception) {
+ return (this.expectedResult == exception.getResult());
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ description.appendText("expects Result: ")
+ .appendValue(this.expectedResult);
+
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java
index 9cd41c248a..a65ed21e4f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java
@@ -90,8 +90,7 @@ public void testSubmitPlanWithInvalidHash() throws Exception {
int planVersion = rpcTestHelper.getPlanVersion();
NodePlan plan = rpcTestHelper.getPlan();
thrown.expect(DiskBalancerException.class);
- thrown.expect(new
- ResultVerifier(Result.INVALID_PLAN_HASH));
+ thrown.expect(new DiskBalancerResultVerifier(Result.INVALID_PLAN_HASH));
dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson());
}
@@ -104,8 +103,7 @@ public void testSubmitPlanWithInvalidVersion() throws Exception {
planVersion++;
NodePlan plan = rpcTestHelper.getPlan();
thrown.expect(DiskBalancerException.class);
- thrown.expect(new
- ResultVerifier(Result.INVALID_PLAN_VERSION));
+ thrown.expect(new DiskBalancerResultVerifier(Result.INVALID_PLAN_VERSION));
dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson());
}
@@ -117,8 +115,7 @@ public void testSubmitPlanWithInvalidPlan() throws Exception {
int planVersion = rpcTestHelper.getPlanVersion();
NodePlan plan = rpcTestHelper.getPlan();
thrown.expect(DiskBalancerException.class);
- thrown.expect(new
- ResultVerifier(Result.INVALID_PLAN));
+ thrown.expect(new DiskBalancerResultVerifier(Result.INVALID_PLAN));
dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, "");
}
@@ -143,8 +140,7 @@ public void testCancelNonExistentPlan() throws Exception {
planHash = String.valueOf(hashArray);
NodePlan plan = rpcTestHelper.getPlan();
thrown.expect(DiskBalancerException.class);
- thrown.expect(new
- ResultVerifier(Result.NO_SUCH_PLAN));
+ thrown.expect(new DiskBalancerResultVerifier(Result.NO_SUCH_PLAN));
dataNode.cancelDiskBalancePlan(planHash);
}
@@ -155,8 +151,7 @@ public void testCancelEmptyPlan() throws Exception {
String planHash = "";
NodePlan plan = rpcTestHelper.getPlan();
thrown.expect(DiskBalancerException.class);
- thrown.expect(new
- ResultVerifier(Result.NO_SUCH_PLAN));
+ thrown.expect(new DiskBalancerResultVerifier(Result.NO_SUCH_PLAN));
dataNode.cancelDiskBalancePlan(planHash);
}
@@ -182,8 +177,7 @@ public void testGetDiskBalancerInvalidSetting() throws Exception {
final String invalidSetting = "invalidSetting";
DataNode dataNode = cluster.getDataNodes().get(dnIndex);
thrown.expect(DiskBalancerException.class);
- thrown.expect(new
- ResultVerifier(Result.UNKNOWN_KEY));
+ thrown.expect(new DiskBalancerResultVerifier(Result.UNKNOWN_KEY));
dataNode.getDiskBalancerSetting(invalidSetting);
}
@@ -274,25 +268,4 @@ public RpcTestHelper invoke() throws Exception {
return this;
}
}
-
- private class ResultVerifier
- extends TypeSafeMatcher {
- private final DiskBalancerException.Result expectedResult;
-
- ResultVerifier(DiskBalancerException.Result expectedResult){
- this.expectedResult = expectedResult;
- }
-
- @Override
- protected boolean matchesSafely(DiskBalancerException exception) {
- return (this.expectedResult == exception.getResult());
- }
-
- @Override
- public void describeTo(Description description) {
- description.appendText("expects Result: ")
- .appendValue(this.expectedResult);
-
- }
- }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerWithMockMover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerWithMockMover.java
new file mode 100644
index 0000000000..ed761edceb
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerWithMockMover.java
@@ -0,0 +1,570 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdfs.server.diskbalancer;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DiskBalancer;
+import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkItem;
+import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ClusterConnector;
+import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ConnectorFactory;
+import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerCluster;
+import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerDataNode;
+import org.apache.hadoop.hdfs.server.diskbalancer.planner.GreedyPlanner;
+import org.apache.hadoop.hdfs.server.diskbalancer.planner.MoveStep;
+import org.apache.hadoop.hdfs.server.diskbalancer.planner.NodePlan;
+import org.apache.hadoop.hdfs.server.diskbalancer.planner.Step;
+import org.apache.hadoop.util.Time;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result.NO_PLAN;
+import static org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result.PLAN_DONE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestDiskBalancerWithMockMover {
+ static final Log LOG = LogFactory.getLog(TestDiskBalancerWithMockMover.class);
+
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+
+ MiniDFSCluster cluster;
+ String sourceName;
+ String destName;
+ String sourceUUID;
+ String destUUID;
+ String nodeID;
+ DataNode dataNode;
+
+ /**
+ * Checks that we return the right error if diskbalancer is not enabled.
+ */
+ @Test
+ public void testDiskBalancerDisabled() throws Exception {
+ Configuration conf = new HdfsConfiguration();
+ conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, false);
+ restartDataNode();
+
+ TestMover blockMover = new TestMover(cluster.getDataNodes()
+ .get(0).getFSDataset());
+
+ DiskBalancer balancer = new DiskBalancerBuilder(conf)
+ .setMover(blockMover)
+ .build();
+
+ thrown.expect(DiskBalancerException.class);
+ thrown.expect(new DiskBalancerResultVerifier(DiskBalancerException
+ .Result.DISK_BALANCER_NOT_ENABLED));
+
+ balancer.queryWorkStatus();
+ }
+
+ /**
+ * Checks that Enable flag works correctly.
+ *
+ * @throws DiskBalancerException
+ */
+ @Test
+ public void testDiskBalancerEnabled() throws DiskBalancerException {
+ Configuration conf = new HdfsConfiguration();
+ conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
+
+ TestMover blockMover = new TestMover(cluster.getDataNodes()
+ .get(0).getFSDataset());
+
+ DiskBalancer balancer = new DiskBalancerBuilder(conf)
+ .setMover(blockMover)
+ .build();
+
+ DiskBalancerWorkStatus status = balancer.queryWorkStatus();
+ assertEquals(NO_PLAN, status.getResult());
+
+ }
+
+ private void executeSubmitPlan(NodePlan plan, DiskBalancer balancer,
+ int version) throws IOException {
+ String planJson = plan.toJson();
+ String planID = DigestUtils.sha512Hex(planJson);
+ balancer.submitPlan(planID, version, planJson, 10, false);
+ }
+
+ private void executeSubmitPlan(NodePlan plan, DiskBalancer balancer)
+ throws IOException {
+ executeSubmitPlan(plan, balancer, 1);
+ }
+
+ /**
+ * Test a second submit plan fails.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testResubmitDiskBalancerPlan() throws Exception {
+ MockMoverHelper mockMoverHelper = new MockMoverHelper().invoke();
+ NodePlan plan = mockMoverHelper.getPlan();
+ DiskBalancer balancer = mockMoverHelper.getBalancer();
+
+ // ask block mover to get stuck in copy block
+ mockMoverHelper.getBlockMover().setSleep();
+ executeSubmitPlan(plan, balancer);
+ thrown.expect(DiskBalancerException.class);
+ thrown.expect(new DiskBalancerResultVerifier(DiskBalancerException
+ .Result.PLAN_ALREADY_IN_PROGRESS));
+ executeSubmitPlan(plan, balancer);
+
+ // Not needed but this is the cleanup step.
+ mockMoverHelper.getBlockMover().clearSleep();
+ }
+
+ @Test
+ public void testSubmitDiskBalancerPlan() throws Exception {
+ MockMoverHelper mockMoverHelper = new MockMoverHelper().invoke();
+ NodePlan plan = mockMoverHelper.getPlan();
+ DiskBalancer balancer = mockMoverHelper.getBalancer();
+
+ executeSubmitPlan(plan, balancer);
+ int counter = 0;
+ while ((balancer.queryWorkStatus().getResult() != PLAN_DONE) &&
+ (counter < 3)) {
+ Thread.sleep(1000);
+ counter++;
+ }
+
+ // Asserts that submit plan caused an execution in the background.
+ assertTrue(mockMoverHelper.getBlockMover().getRunCount() == 1);
+ }
+
+ @Test
+ public void testSubmitWithOlderPlan() throws Exception {
+ final long MILLISECOND_IN_AN_HOUR = 1000 * 60 * 60L;
+ MockMoverHelper mockMoverHelper = new MockMoverHelper().invoke();
+ NodePlan plan = mockMoverHelper.getPlan();
+ DiskBalancer balancer = mockMoverHelper.getBalancer();
+
+ plan.setTimeStamp(Time.now() - (32 * MILLISECOND_IN_AN_HOUR));
+ thrown.expect(DiskBalancerException.class);
+ thrown.expect(new DiskBalancerResultVerifier(DiskBalancerException
+ .Result.OLD_PLAN_SUBMITTED));
+ executeSubmitPlan(plan, balancer);
+ }
+
+ @Test
+ public void testSubmitWithOldInvalidVersion() throws Exception {
+ MockMoverHelper mockMoverHelper = new MockMoverHelper().invoke();
+ NodePlan plan = mockMoverHelper.getPlan();
+ DiskBalancer balancer = mockMoverHelper.getBalancer();
+
+ thrown.expect(DiskBalancerException.class);
+ thrown.expect(new DiskBalancerResultVerifier(DiskBalancerException
+ .Result.INVALID_PLAN_VERSION));
+
+ // Plan version is invalid -- there is no version 0.
+ executeSubmitPlan(plan, balancer, 0);
+ }
+
+ @Test
+ public void testSubmitWithNullPlan() throws Exception {
+ MockMoverHelper mockMoverHelper = new MockMoverHelper().invoke();
+ NodePlan plan = mockMoverHelper.getPlan();
+ DiskBalancer balancer = mockMoverHelper.getBalancer();
+ String planJson = plan.toJson();
+ String planID = DigestUtils.sha512Hex(planJson);
+
+ thrown.expect(DiskBalancerException.class);
+ thrown.expect(new DiskBalancerResultVerifier(DiskBalancerException
+ .Result.INVALID_PLAN));
+
+ balancer.submitPlan(planID, 1, null, 10, false);
+ }
+
+ @Test
+ public void testSubmitWithInvalidHash() throws Exception {
+ MockMoverHelper mockMoverHelper = new MockMoverHelper().invoke();
+ NodePlan plan = mockMoverHelper.getPlan();
+ DiskBalancer balancer = mockMoverHelper.getBalancer();
+
+
+ String planJson = plan.toJson();
+ String planID = DigestUtils.sha512Hex(planJson);
+ char repChar = planID.charAt(0);
+ repChar++;
+
+ thrown.expect(DiskBalancerException.class);
+ thrown.expect(new DiskBalancerResultVerifier(DiskBalancerException
+ .Result.INVALID_PLAN_HASH));
+ balancer.submitPlan(planID.replace(planID.charAt(0), repChar),
+ 1, planJson, 10, false);
+
+ }
+
+ /**
+ * Test Cancel Plan.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testCancelDiskBalancerPlan() throws Exception {
+ MockMoverHelper mockMoverHelper = new MockMoverHelper().invoke();
+ NodePlan plan = mockMoverHelper.getPlan();
+ DiskBalancer balancer = mockMoverHelper.getBalancer();
+
+
+ // ask block mover to delay execution
+ mockMoverHelper.getBlockMover().setSleep();
+ executeSubmitPlan(plan, balancer);
+
+
+ String planJson = plan.toJson();
+ String planID = DigestUtils.sha512Hex(planJson);
+ balancer.cancelPlan(planID);
+
+ DiskBalancerWorkStatus status = balancer.queryWorkStatus();
+ assertEquals(DiskBalancerWorkStatus.Result.PLAN_CANCELLED,
+ status.getResult());
+
+
+ executeSubmitPlan(plan, balancer);
+
+ // Send a Wrong cancellation request.
+ char first = planID.charAt(0);
+ first++;
+ thrown.expect(DiskBalancerException.class);
+ thrown.expect(new DiskBalancerResultVerifier(DiskBalancerException
+ .Result.NO_SUCH_PLAN));
+ balancer.cancelPlan(planID.replace(planID.charAt(0), first));
+
+ // Now cancel the real one
+ balancer.cancelPlan(planID);
+ mockMoverHelper.getBlockMover().clearSleep(); // unblock mover.
+
+ status = balancer.queryWorkStatus();
+ assertEquals(DiskBalancerWorkStatus.Result.PLAN_CANCELLED,
+ status.getResult());
+
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ Configuration conf = new HdfsConfiguration();
+ final int NUM_STORAGES_PER_DN = 2;
+ cluster = new MiniDFSCluster
+ .Builder(conf).numDataNodes(3)
+ .storagesPerDatanode(NUM_STORAGES_PER_DN)
+ .build();
+ cluster.waitActive();
+ dataNode = cluster.getDataNodes().get(0);
+ FsDatasetSpi.FsVolumeReferences references = dataNode.getFSDataset()
+ .getFsVolumeReferences();
+
+ nodeID = dataNode.getDatanodeUuid();
+ sourceName = references.get(0).getBasePath();
+ destName = references.get(1).getBasePath();
+ sourceUUID = references.get(0).getStorageID();
+ destUUID = references.get(1).getStorageID();
+ references.close();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ private void restartDataNode() throws IOException {
+ if (cluster != null) {
+ cluster.restartDataNode(0);
+ }
+ }
+
+ /**
+ * Allows us to control mover class for test purposes.
+ */
+ public static class TestMover implements DiskBalancer.BlockMover {
+
+ private AtomicBoolean shouldRun;
+ private FsDatasetSpi dataset;
+ private Integer runCount;
+ private volatile boolean sleepInCopyBlocks;
+ private long delay;
+
+ public TestMover(FsDatasetSpi dataset) {
+ this.dataset = dataset;
+ this.shouldRun = new AtomicBoolean(false);
+ this.runCount = new Integer(0);
+ }
+
+ public void setSleep() {
+ sleepInCopyBlocks = true;
+ }
+
+ public void clearSleep() {
+ sleepInCopyBlocks = false;
+ }
+
+ public void setDelay(long milliseconds) {
+ this.delay = milliseconds;
+ }
+
+ /**
+ * Copies blocks from a set of volumes.
+ *
+ * @param pair - Source and Destination Volumes.
+ * @param item - Number of bytes to move from volumes.
+ */
+ @Override
+ public void copyBlocks(DiskBalancer.VolumePair pair,
+ DiskBalancerWorkItem item) {
+
+ try {
+ // get stuck if we are asked to sleep.
+ while (sleepInCopyBlocks) {
+ if (!this.shouldRun()) {
+ return;
+ }
+ Thread.sleep(10);
+ }
+ if (delay > 0) {
+ Thread.sleep(delay);
+ }
+ synchronized (runCount) {
+ if (shouldRun()) {
+ runCount++;
+ }
+ }
+ } catch (InterruptedException ex) {
+ // A failure here can be safely ignored with no impact for tests.
+ LOG.error(ex.toString());
+ }
+ }
+
+ /**
+ * Sets copyblocks into runnable state.
+ */
+ @Override
+ public void setRunnable() {
+ this.shouldRun.set(true);
+ }
+
+ /**
+ * Signals copy block to exit.
+ */
+ @Override
+ public void setExitFlag() {
+ this.shouldRun.set(false);
+ }
+
+ /**
+ * Returns the shouldRun boolean flag.
+ */
+ public boolean shouldRun() {
+ return this.shouldRun.get();
+ }
+
+ @Override
+ public FsDatasetSpi getDataset() {
+ return this.dataset;
+ }
+
+ public int getRunCount() {
+ synchronized (runCount) {
+ LOG.info("Run count : " + runCount.intValue());
+ return runCount.intValue();
+ }
+ }
+ }
+
+ private class MockMoverHelper {
+ private DiskBalancer balancer;
+ private NodePlan plan;
+ private TestMover blockMover;
+
+ public DiskBalancer getBalancer() {
+ return balancer;
+ }
+
+ public NodePlan getPlan() {
+ return plan;
+ }
+
+ public TestMover getBlockMover() {
+ return blockMover;
+ }
+
+ public MockMoverHelper invoke() throws Exception {
+ Configuration conf = new HdfsConfiguration();
+ conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
+ restartDataNode();
+
+ blockMover = new TestMover(dataNode.getFSDataset());
+ blockMover.setRunnable();
+
+ balancer = new DiskBalancerBuilder(conf)
+ .setMover(blockMover)
+ .setNodeID(nodeID)
+ .build();
+
+ DiskBalancerCluster diskBalancerCluster = new DiskBalancerClusterBuilder()
+ .setClusterSource("/diskBalancer/data-cluster-3node-3disk.json")
+ .build();
+
+ plan = new PlanBuilder(diskBalancerCluster, nodeID)
+ .setPathMap(sourceName, destName)
+ .setUUIDMap(sourceUUID, destUUID)
+ .build();
+ return this;
+ }
+ }
+
+ private class DiskBalancerBuilder {
+ private TestMover blockMover;
+ private Configuration conf;
+ private String nodeID;
+
+ public DiskBalancerBuilder(Configuration conf) {
+ this.conf = conf;
+ }
+
+ public DiskBalancerBuilder setNodeID(String nodeID) {
+ this.nodeID = nodeID;
+ return this;
+ }
+
+ public DiskBalancerBuilder setConf(Configuration conf) {
+ this.conf = conf;
+ return this;
+ }
+
+ public DiskBalancerBuilder setMover(TestMover mover) {
+ this.blockMover = mover;
+ return this;
+ }
+
+ public DiskBalancerBuilder setRunnable() {
+ blockMover.setRunnable();
+ return this;
+ }
+
+ public DiskBalancer build() {
+ Preconditions.checkNotNull(blockMover);
+ return new DiskBalancer(nodeID, conf,
+ blockMover);
+ }
+ }
+
+ private class DiskBalancerClusterBuilder {
+ private String jsonFilePath;
+ private Configuration conf;
+
+ public DiskBalancerClusterBuilder setConf(Configuration conf) {
+ this.conf = conf;
+ return this;
+ }
+
+ public DiskBalancerClusterBuilder setClusterSource(String jsonFilePath)
+ throws Exception {
+ this.jsonFilePath = jsonFilePath;
+ return this;
+ }
+
+ public DiskBalancerCluster build() throws Exception {
+ DiskBalancerCluster diskBalancerCluster;
+ URI clusterJson = getClass().getResource(jsonFilePath).toURI();
+ ClusterConnector jsonConnector =
+ ConnectorFactory.getCluster(clusterJson, conf);
+ diskBalancerCluster = new DiskBalancerCluster(jsonConnector);
+ diskBalancerCluster.readClusterInfo();
+ diskBalancerCluster.setNodesToProcess(diskBalancerCluster.getNodes());
+ return diskBalancerCluster;
+ }
+ }
+
+ private class PlanBuilder {
+ private String sourcePath;
+ private String destPath;
+ private String sourceUUID;
+ private String destUUID;
+ private DiskBalancerCluster balancerCluster;
+ private String nodeID;
+
+ public PlanBuilder(DiskBalancerCluster balancerCluster, String nodeID) {
+ this.balancerCluster = balancerCluster;
+ this.nodeID = nodeID;
+ }
+
+ public PlanBuilder setPathMap(String sourcePath, String destPath) {
+ this.sourcePath = sourcePath;
+ this.destPath = destPath;
+ return this;
+ }
+
+ public PlanBuilder setUUIDMap(String sourceUUID, String destUUID) {
+ this.sourceUUID = sourceUUID;
+ this.destUUID = destUUID;
+ return this;
+ }
+
+ public NodePlan build() throws Exception {
+ final int dnIndex = 0;
+ Preconditions.checkNotNull(balancerCluster);
+ Preconditions.checkState(nodeID.length() > 0);
+
+ DiskBalancerDataNode node = balancerCluster.getNodes().get(dnIndex);
+ node.setDataNodeUUID(nodeID);
+ GreedyPlanner planner = new GreedyPlanner(10.0f, node);
+ NodePlan plan = new NodePlan(node.getDataNodeName(), node.getDataNodePort
+ ());
+ planner.balanceVolumeSet(node, node.getVolumeSets().get("DISK"), plan);
+ setVolumeNames(plan);
+ return plan;
+ }
+
+ private void setVolumeNames(NodePlan plan) {
+ Iterator iter = plan.getVolumeSetPlans().iterator();
+ while (iter.hasNext()) {
+ MoveStep nextStep = (MoveStep) iter.next();
+ nextStep.getSourceVolume().setPath(sourcePath);
+ nextStep.getSourceVolume().setUuid(sourceUUID);
+ nextStep.getDestinationVolume().setPath(destPath);
+ nextStep.getDestinationVolume().setUuid(destUUID);
+ }
+ }
+
+ }
+}
+