HDFS-9645. DiskBalancer: Add Query RPC. (Contributed by Anu Engineer)

This commit is contained in:
Arpit Agarwal 2016-01-20 10:47:30 -08:00
parent 0501d430e2
commit 96fe685b7a
8 changed files with 214 additions and 3 deletions

View File

@ -30,6 +30,7 @@
import org.apache.hadoop.security.KerberosInfo;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenInfo;
import org.apache.hadoop.hdfs.server.datanode.WorkStatus;
/** An client-datanode protocol for block recovery
*/
@ -177,4 +178,9 @@ void submitDiskBalancerPlan(String planID, long planVersion, long bandwidth,
*/
void cancelDiskBalancePlan(String planID) throws IOException;
/**
* Gets the status of an executing diskbalancer Plan.
*/
WorkStatus queryDiskBalancerPlan() throws IOException;
}

View File

@ -54,7 +54,10 @@
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.TriggerBlockReportRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.SubmitDiskBalancerPlanRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.CancelPlanRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.QueryPlanStatusRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.QueryPlanStatusResponseProto;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.WorkStatus;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtocolMetaInterface;
@ -377,4 +380,23 @@ public void cancelDiskBalancePlan(String planID)
throw ProtobufHelper.getRemoteException(e);
}
}
/**
* Gets the status of an executing diskbalancer Plan.
*/
@Override
public WorkStatus queryDiskBalancerPlan() throws IOException {
try {
QueryPlanStatusRequestProto request =
QueryPlanStatusRequestProto.newBuilder().build();
QueryPlanStatusResponseProto response =
rpcProxy.queryDiskBalancerPlan(NULL_CONTROLLER, request);
return new WorkStatus(response.hasResult() ? response.getResult() : 0,
response.hasPlanID() ? response.getPlanID() : null,
response.hasStatus() ? response.getStatus() : null,
response.hasCurrentStatus() ? response.getCurrentStatus() : null);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
}

View File

@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.hadoop.hdfs.server.datanode;
import org.apache.hadoop.classification.InterfaceAudience;
/**
* Helper class that reports how much work has has been done by the node.
*/
@InterfaceAudience.Private
public class WorkStatus {
private int result;
private String planID;
private String status;
private String currentState;
/**
* Constructs a workStatus Object.
*
* @param result - int
* @param planID - Plan ID
* @param status - Current Status
* @param currentState - Current State
*/
public WorkStatus(int result, String planID, String status,
String currentState) {
this.result = result;
this.planID = planID;
this.status = status;
this.currentState = currentState;
}
/**
* Returns result.
*
* @return long
*/
public int getResult() {
return result;
}
/**
* Returns planID.
*
* @return String
*/
public String getPlanID() {
return planID;
}
/**
* Returns Status.
*
* @return String
*/
public String getStatus() {
return status;
}
/**
* Gets current Status.
*
* @return - Json String
*/
public String getCurrentState() {
return currentState;
}
}

View File

@ -180,6 +180,26 @@ message CancelPlanRequestProto {
message CancelPlanResponseProto {
}
/**
* This message allows a client to query data node to see
* if a disk balancer plan is executing and if so what is
* the status.
*/
message QueryPlanStatusRequestProto {
}
/**
* This message describes a plan if it is in progress
*/
message QueryPlanStatusResponseProto {
optional uint32 result = 1;
optional string status = 2;
optional string planID = 3;
optional string currentStatus = 4;
}
/**
* Protocol used from client to the Datanode.
* See the request and response for details of rpc call.
@ -249,4 +269,10 @@ service ClientDatanodeProtocolService {
*/
rpc cancelDiskBalancerPlan(CancelPlanRequestProto)
returns (CancelPlanResponseProto);
/**
* Gets the status of an executing Plan
*/
rpc queryDiskBalancerPlan(QueryPlanStatusRequestProto)
returns (QueryPlanStatusResponseProto);
}

View File

@ -19,3 +19,5 @@ HDFS-1312 Change Log
HDFS-9595. DiskBalancer: Add cancelPlan RPC. (Anu Engineer via
Arpit Agarwal)
HDFS-9645. DiskBalancer: Add Query RPC. (Anu Engineer via Arpit Agarwal)

View File

@ -51,9 +51,12 @@
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.SubmitDiskBalancerPlanResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.CancelPlanRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.CancelPlanResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.QueryPlanStatusRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.QueryPlanStatusResponseProto;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.hdfs.server.datanode.WorkStatus;
/**
* Implementation for protobuf service that forwards requests
@ -281,4 +284,24 @@ public CancelPlanResponseProto cancelDiskBalancerPlan(
}
}
/**
* Gets the status of an executing Plan.
*/
@Override
public QueryPlanStatusResponseProto queryDiskBalancerPlan(
RpcController controller, QueryPlanStatusRequestProto request)
throws ServiceException {
try {
WorkStatus result = impl.queryDiskBalancerPlan();
return QueryPlanStatusResponseProto
.newBuilder()
.setResult(result.getResult())
.setPlanID(result.getPlanID())
.setStatus(result.getStatus())
.setCurrentStatus(result.getCurrentState())
.build();
} catch (Exception e) {
throw new ServiceException(e);
}
}
}

View File

@ -3317,4 +3317,9 @@ public void cancelDiskBalancePlan(String planID) throws
throw new DiskbalancerException("Not Implemented", 0);
}
@Override
public WorkStatus queryDiskBalancerPlan() throws IOException {
checkSuperuserPrivilege();
throw new DiskbalancerException("Not Implemented", 0);
}
}

View File

@ -35,6 +35,7 @@
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.IOException;
import java.net.URI;
public class TestDiskBalancerRPC {
@ -43,6 +44,7 @@ public class TestDiskBalancerRPC {
private MiniDFSCluster cluster;
private Configuration conf;
@Before
public void setUp() throws Exception {
conf = new HdfsConfiguration();
@ -113,11 +115,51 @@ public void TestCancelTestRpc() throws Exception {
// Since submitDiskBalancerPlan is not implemented yet, it throws an
// Exception, this will be modified with the actual implementation.
thrown.expect(DiskbalancerException.class);
dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson());
try {
dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson());
} catch (DiskbalancerException ex) {
// Let us ignore this for time being.
}
thrown.expect(DiskbalancerException.class);
dataNode.cancelDiskBalancePlan(planHash);
}
@Test
public void TestQueryTestRpc() throws Exception {
final int dnIndex = 0;
cluster.restartDataNode(dnIndex);
cluster.waitActive();
ClusterConnector nameNodeConnector =
ConnectorFactory.getCluster(cluster.getFileSystem(0).getUri(), conf);
DiskBalancerCluster diskBalancerCluster = new DiskBalancerCluster
(nameNodeConnector);
diskBalancerCluster.readClusterInfo();
Assert.assertEquals(cluster.getDataNodes().size(),
diskBalancerCluster.getNodes().size());
diskBalancerCluster.setNodesToProcess(diskBalancerCluster.getNodes());
DiskBalancerDataNode node = diskBalancerCluster.getNodes().get(0);
GreedyPlanner planner = new GreedyPlanner(10.0f, node);
NodePlan plan = new NodePlan(node.getDataNodeName(), node.getDataNodePort
());
planner.balanceVolumeSet(node, node.getVolumeSets().get("DISK"), plan);
final int planVersion = 0; // So far we support only one version.
DataNode dataNode = cluster.getDataNodes().get(dnIndex);
String planHash = DigestUtils.sha512Hex(plan.toJson());
// Since submitDiskBalancerPlan is not implemented yet, it throws an
// Exception, this will be modified with the actual implementation.
try {
dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson());
} catch (DiskbalancerException ex) {
// Let us ignore this for time being.
}
// TODO : This will be fixed when we have implementation for this
// function in server side.
thrown.expect(DiskbalancerException.class);
dataNode.queryDiskBalancerPlan();
}
}