HDFS-10399. DiskBalancer: Add JMX for DiskBalancer. Contributed by Anu Engineer.

This commit is contained in:
Anu Engineer 2016-05-20 08:53:28 -07:00 committed by Arpit Agarwal
parent 1b39b283c7
commit 5df2d2b8fd
7 changed files with 77 additions and 11 deletions

View File

@ -34,7 +34,7 @@
@InterfaceStability.Unstable @InterfaceStability.Unstable
@JsonInclude(JsonInclude.Include.NON_DEFAULT) @JsonInclude(JsonInclude.Include.NON_DEFAULT)
public class DiskBalancerWorkItem { public class DiskBalancerWorkItem {
private final long bytesToCopy; private long bytesToCopy;
private long bytesCopied; private long bytesCopied;
private long errorCount; private long errorCount;
private String errMsg; private String errMsg;
@ -44,6 +44,14 @@ public class DiskBalancerWorkItem {
private long tolerancePercent; private long tolerancePercent;
private long bandwidth; private long bandwidth;
/**
* Empty constructor for Json serialization.
*/
public DiskBalancerWorkItem() {
}
/** /**
* Constructs a DiskBalancerWorkItem. * Constructs a DiskBalancerWorkItem.
* *

View File

@ -126,11 +126,29 @@ public List<DiskBalancerWorkEntry> getCurrentState() {
* *
* @throws IOException * @throws IOException
**/ **/
public String getCurrentStateString() throws IOException { public String currentStateString() throws IOException {
ObjectMapper mapper = new ObjectMapper(); ObjectMapper mapper = new ObjectMapper();
return mapper.writeValueAsString(currentState); return mapper.writeValueAsString(currentState);
} }
public String toJsonString() throws IOException {
ObjectMapper mapper = new ObjectMapper();
return mapper.writeValueAsString(this);
}
/**
* Returns a DiskBalancerWorkStatus object from the Json .
* @param json - json String
* @return DiskBalancerWorkStatus
* @throws IOException
*/
public static DiskBalancerWorkStatus parseJson(String json) throws
IOException {
ObjectMapper mapper = new ObjectMapper();
return mapper.readValue(json, DiskBalancerWorkStatus.class);
}
/** /**
* Adds a new work entry to the list. * Adds a new work entry to the list.
@ -176,6 +194,16 @@ public static class DiskBalancerWorkEntry {
private String destPath; private String destPath;
private DiskBalancerWorkItem workItem; private DiskBalancerWorkItem workItem;
/**
* Constructor needed for json serialization.
*/
public DiskBalancerWorkEntry() {
}
public DiskBalancerWorkEntry(String workItem) throws IOException {
this.workItem = DiskBalancerWorkItem.parseJson(workItem);
}
/** /**
* Constructs a Work Entry class. * Constructs a Work Entry class.
* *

View File

@ -298,7 +298,7 @@ public QueryPlanStatusResponseProto queryDiskBalancerPlan(
.newBuilder() .newBuilder()
.setResult(result.getResult().getIntResult()) .setResult(result.getResult().getIntResult())
.setPlanID(result.getPlanID()) .setPlanID(result.getPlanID())
.setCurrentStatus(result.getCurrentStateString()) .setCurrentStatus(result.currentStateString())
.build(); .build();
} catch (Exception e) { } catch (Exception e) {
throw new ServiceException(e); throw new ServiceException(e);

View File

@ -2958,6 +2958,16 @@ public String getVolumeInfo() {
public synchronized String getClusterId() { public synchronized String getClusterId() {
return clusterId; return clusterId;
} }
@Override // DataNodeMXBean
public String getDiskBalancerStatus() {
try {
return this.diskBalancer.queryWorkStatus().toJsonString();
} catch (IOException ex) {
LOG.debug("Reading diskbalancer Status failed. ex:{}", ex);
return "";
}
}
public void refreshNamenodes(Configuration conf) throws IOException { public void refreshNamenodes(Configuration conf) throws IOException {
blockPoolManager.refreshNamenodes(conf); blockPoolManager.refreshNamenodes(conf);

View File

@ -90,4 +90,12 @@ public interface DataNodeMXBean {
* Gets the network error counts on a per-Datanode basis. * Gets the network error counts on a per-Datanode basis.
*/ */
public Map<String, Map<String, Long>> getDatanodeNetworkCounts(); public Map<String, Map<String, Long>> getDatanodeNetworkCounts();
/**
* Gets the diskBalancer Status.
* Please see implementation for the format of the returned information.
*
* @return DiskBalancer Status
*/
String getDiskBalancerStatus();
} }

View File

@ -189,6 +189,12 @@ public void TestDiskBalancerEndToEnd() throws Exception {
// Submit the plan and wait till the execution is done. // Submit the plan and wait till the execution is done.
newDN.submitDiskBalancerPlan(planID, 1, planJson, false); newDN.submitDiskBalancerPlan(planID, 1, planJson, false);
String jmxString = newDN.getDiskBalancerStatus();
assertNotNull(jmxString);
DiskBalancerWorkStatus status =
DiskBalancerWorkStatus.parseJson(jmxString);
DiskBalancerWorkStatus realStatus = newDN.queryDiskBalancerPlan();
assertEquals(realStatus.getPlanID(), status.getPlanID());
GenericTestUtils.waitFor(new Supplier<Boolean>() { GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override @Override

View File

@ -20,6 +20,7 @@
package org.apache.hadoop.hdfs.server.diskbalancer; package org.apache.hadoop.hdfs.server.diskbalancer;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -40,6 +41,7 @@
import org.apache.hadoop.hdfs.server.diskbalancer.planner.MoveStep; import org.apache.hadoop.hdfs.server.diskbalancer.planner.MoveStep;
import org.apache.hadoop.hdfs.server.diskbalancer.planner.NodePlan; import org.apache.hadoop.hdfs.server.diskbalancer.planner.NodePlan;
import org.apache.hadoop.hdfs.server.diskbalancer.planner.Step; import org.apache.hadoop.hdfs.server.diskbalancer.planner.Step;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
@ -53,7 +55,6 @@
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result.NO_PLAN; import static org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result.NO_PLAN;
import static org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result.PLAN_DONE;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
@ -156,15 +157,20 @@ public void testResubmitDiskBalancerPlan() throws Exception {
public void testSubmitDiskBalancerPlan() throws Exception { public void testSubmitDiskBalancerPlan() throws Exception {
MockMoverHelper mockMoverHelper = new MockMoverHelper().invoke(); MockMoverHelper mockMoverHelper = new MockMoverHelper().invoke();
NodePlan plan = mockMoverHelper.getPlan(); NodePlan plan = mockMoverHelper.getPlan();
DiskBalancer balancer = mockMoverHelper.getBalancer(); final DiskBalancer balancer = mockMoverHelper.getBalancer();
executeSubmitPlan(plan, balancer); executeSubmitPlan(plan, balancer);
int counter = 0; GenericTestUtils.waitFor(new Supplier<Boolean>() {
while ((balancer.queryWorkStatus().getResult() != PLAN_DONE) && @Override
(counter < 3)) { public Boolean get() {
Thread.sleep(1000); try {
counter++; return balancer.queryWorkStatus().getResult() ==
} DiskBalancerWorkStatus.Result.PLAN_DONE;
} catch (IOException ex) {
return false;
}
}
}, 1000, 100000);
// Asserts that submit plan caused an execution in the background. // Asserts that submit plan caused an execution in the background.
assertTrue(mockMoverHelper.getBlockMover().getRunCount() == 1); assertTrue(mockMoverHelper.getBlockMover().getRunCount() == 1);