diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancerWorkItem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancerWorkItem.java index 7381499f67..fe908d8057 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancerWorkItem.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancerWorkItem.java @@ -34,7 +34,7 @@ @InterfaceStability.Unstable @JsonInclude(JsonInclude.Include.NON_DEFAULT) public class DiskBalancerWorkItem { - private final long bytesToCopy; + private long bytesToCopy; private long bytesCopied; private long errorCount; private String errMsg; @@ -44,6 +44,14 @@ public class DiskBalancerWorkItem { private long tolerancePercent; private long bandwidth; + /** + * Empty constructor for Json serialization. + */ + public DiskBalancerWorkItem() { + + } + + /** * Constructs a DiskBalancerWorkItem. * diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancerWorkStatus.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancerWorkStatus.java index d6943cf37a..ca5e5f0675 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancerWorkStatus.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancerWorkStatus.java @@ -126,11 +126,29 @@ public List getCurrentState() { * * @throws IOException **/ - public String getCurrentStateString() throws IOException { + public String currentStateString() throws IOException { ObjectMapper mapper = new ObjectMapper(); return mapper.writeValueAsString(currentState); } + public String toJsonString() throws IOException { + ObjectMapper mapper = new ObjectMapper(); + return mapper.writeValueAsString(this); + + } + + /** + * Returns a DiskBalancerWorkStatus object from the Json . + * @param json - json String + * @return DiskBalancerWorkStatus + * @throws IOException + */ + public static DiskBalancerWorkStatus parseJson(String json) throws + IOException { + ObjectMapper mapper = new ObjectMapper(); + return mapper.readValue(json, DiskBalancerWorkStatus.class); + } + /** * Adds a new work entry to the list. @@ -176,6 +194,16 @@ public static class DiskBalancerWorkEntry { private String destPath; private DiskBalancerWorkItem workItem; + /** + * Constructor needed for json serialization. + */ + public DiskBalancerWorkEntry() { + } + + public DiskBalancerWorkEntry(String workItem) throws IOException { + this.workItem = DiskBalancerWorkItem.parseJson(workItem); + } + /** * Constructs a Work Entry class. * diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java index 482e86fea7..b716347663 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java @@ -298,7 +298,7 @@ public QueryPlanStatusResponseProto queryDiskBalancerPlan( .newBuilder() .setResult(result.getResult().getIntResult()) .setPlanID(result.getPlanID()) - .setCurrentStatus(result.getCurrentStateString()) + .setCurrentStatus(result.currentStateString()) .build(); } catch (Exception e) { throw new ServiceException(e); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index d6be2e0559..94de27b6cf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -2958,6 +2958,16 @@ public String getVolumeInfo() { public synchronized String getClusterId() { return clusterId; } + + @Override // DataNodeMXBean + public String getDiskBalancerStatus() { + try { + return this.diskBalancer.queryWorkStatus().toJsonString(); + } catch (IOException ex) { + LOG.debug("Reading diskbalancer Status failed. ex:{}", ex); + return ""; + } + } public void refreshNamenodes(Configuration conf) throws IOException { blockPoolManager.refreshNamenodes(conf); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java index 0119dc8737..a59dc1ccea 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java @@ -90,4 +90,12 @@ public interface DataNodeMXBean { * Gets the network error counts on a per-Datanode basis. */ public Map> getDatanodeNetworkCounts(); + + /** + * Gets the diskBalancer Status. + * Please see implementation for the format of the returned information. + * + * @return DiskBalancer Status + */ + String getDiskBalancerStatus(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancer.java index f50637c9e4..bd969c7dc7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancer.java @@ -189,6 +189,12 @@ public void TestDiskBalancerEndToEnd() throws Exception { // Submit the plan and wait till the execution is done. newDN.submitDiskBalancerPlan(planID, 1, planJson, false); + String jmxString = newDN.getDiskBalancerStatus(); + assertNotNull(jmxString); + DiskBalancerWorkStatus status = + DiskBalancerWorkStatus.parseJson(jmxString); + DiskBalancerWorkStatus realStatus = newDN.queryDiskBalancerPlan(); + assertEquals(realStatus.getPlanID(), status.getPlanID()); GenericTestUtils.waitFor(new Supplier() { @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerWithMockMover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerWithMockMover.java index 1cc90e5c6c..491fccb009 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerWithMockMover.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerWithMockMover.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.diskbalancer; import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -40,6 +41,7 @@ import org.apache.hadoop.hdfs.server.diskbalancer.planner.MoveStep; import org.apache.hadoop.hdfs.server.diskbalancer.planner.NodePlan; import org.apache.hadoop.hdfs.server.diskbalancer.planner.Step; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.Time; import org.junit.After; import org.junit.Before; @@ -53,7 +55,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result.NO_PLAN; -import static org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result.PLAN_DONE; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -156,15 +157,20 @@ public void testResubmitDiskBalancerPlan() throws Exception { public void testSubmitDiskBalancerPlan() throws Exception { MockMoverHelper mockMoverHelper = new MockMoverHelper().invoke(); NodePlan plan = mockMoverHelper.getPlan(); - DiskBalancer balancer = mockMoverHelper.getBalancer(); + final DiskBalancer balancer = mockMoverHelper.getBalancer(); executeSubmitPlan(plan, balancer); - int counter = 0; - while ((balancer.queryWorkStatus().getResult() != PLAN_DONE) && - (counter < 3)) { - Thread.sleep(1000); - counter++; - } + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + try { + return balancer.queryWorkStatus().getResult() == + DiskBalancerWorkStatus.Result.PLAN_DONE; + } catch (IOException ex) { + return false; + } + } + }, 1000, 100000); // Asserts that submit plan caused an execution in the background. assertTrue(mockMoverHelper.getBlockMover().getRunCount() == 1);