From 2b1b2faf76a7ff148650a7836935a85439f60c49 Mon Sep 17 00:00:00 2001 From: Arpit Agarwal Date: Mon, 22 Feb 2016 11:45:51 -0800 Subject: [PATCH] HDFS-9671. DiskBalancer: SubmitPlan implementation. (Contributed by Anu Engineer) --- .../hdfs/protocol/ClientDatanodeProtocol.java | 4 +- .../ClientDatanodeProtocolTranslatorPB.java | 10 +- .../server/datanode/DiskBalancerWorkItem.java | 160 ++++++ ...tatus.java => DiskBalancerWorkStatus.java} | 16 +- .../hadoop-hdfs/HDFS-1312_CHANGES.txt | 7 +- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 5 + ...atanodeProtocolServerSideTranslatorPB.java | 4 +- .../hadoop/hdfs/server/datanode/DataNode.java | 56 +- .../hdfs/server/datanode/DiskBalancer.java | 542 ++++++++++++++++++ .../diskbalancer/DiskBalancerConstants.java | 9 + ...eption.java => DiskBalancerException.java} | 64 ++- .../datamodel/DiskBalancerCluster.java | 14 + .../diskbalancer/TestDiskBalancerRPC.java | 28 +- 13 files changed, 846 insertions(+), 73 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancerWorkItem.java rename hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/{WorkStatus.java => DiskBalancerWorkStatus.java} (82%) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java rename hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/{DiskbalancerException.java => DiskBalancerException.java} (54%) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java index dede89e7c5..d8df7fb205 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java @@ -30,7 +30,7 @@ import org.apache.hadoop.hdfs.security.token.block.BlockTokenSelector; import org.apache.hadoop.security.KerberosInfo; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenInfo; -import org.apache.hadoop.hdfs.server.datanode.WorkStatus; +import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus; /** An client-datanode protocol for block recovery */ @@ -182,7 +182,7 @@ public interface ClientDatanodeProtocol { /** * Gets the status of an executing diskbalancer Plan. */ - WorkStatus queryDiskBalancerPlan() throws IOException; + DiskBalancerWorkStatus queryDiskBalancerPlan() throws IOException; /** * Gets a run-time configuration value from running diskbalancer instance. diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java index e7e0d945bb..786d834d8d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java @@ -59,7 +59,7 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.QueryP import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DiskBalancerSettingRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DiskBalancerSettingResponseProto; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; -import org.apache.hadoop.hdfs.server.datanode.WorkStatus; +import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus; import org.apache.hadoop.ipc.ProtobufHelper; import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.ProtocolMetaInterface; @@ -345,8 +345,7 @@ public class ClientDatanodeProtocolTranslatorPB implements * to zero allows datanode to use the value defined in * configration. * @param plan - Actual plan. - * @return Success or throws Exception. - * @throws Exception + * @throws IOException */ @Override public void submitDiskBalancerPlan(String planID, long planVersion, @@ -387,13 +386,14 @@ public class ClientDatanodeProtocolTranslatorPB implements * Gets the status of an executing diskbalancer Plan. */ @Override - public WorkStatus queryDiskBalancerPlan() throws IOException { + public DiskBalancerWorkStatus queryDiskBalancerPlan() throws IOException { try { QueryPlanStatusRequestProto request = QueryPlanStatusRequestProto.newBuilder().build(); QueryPlanStatusResponseProto response = rpcProxy.queryDiskBalancerPlan(NULL_CONTROLLER, request); - return new WorkStatus(response.hasResult() ? response.getResult() : 0, + return new DiskBalancerWorkStatus(response.hasResult() ? + response.getResult() : 0, response.hasPlanID() ? response.getPlanID() : null, response.hasStatus() ? response.getStatus() : null, response.hasCurrentStatus() ? response.getCurrentStatus() : null); diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancerWorkItem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancerWorkItem.java new file mode 100644 index 0000000000..11730e23bd --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancerWorkItem.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.hadoop.hdfs.server.datanode; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.codehaus.jackson.map.ObjectMapper; + +import java.io.IOException; + +/** + * Keeps track of how much work has finished. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class DiskBalancerWorkItem { + private final long bytesToCopy; + private long bytesCopied; + private long errorCount; + private String errMsg; + private long blocksCopied; + + /** + * Constructs a DiskBalancerWorkItem. + * + * @param bytesToCopy - Total bytes to copy from a disk + * @param bytesCopied - Copied So far. + */ + public DiskBalancerWorkItem(long bytesToCopy, long bytesCopied) { + this.bytesToCopy = bytesToCopy; + this.bytesCopied = bytesCopied; + } + + /** + * Reads a DiskBalancerWorkItem Object from a Json String. + * + * @param json - Json String. + * @return DiskBalancerWorkItem Object + * @throws IOException + */ + public static DiskBalancerWorkItem parseJson(String json) throws IOException { + Preconditions.checkNotNull(json); + ObjectMapper mapper = new ObjectMapper(); + return mapper.readValue(json, DiskBalancerWorkItem.class); + } + + /** + * Gets the error message. + */ + public String getErrMsg() { + return errMsg; + } + + /** + * Sets the error message. + * + * @param errMsg - Msg. + */ + public void setErrMsg(String errMsg) { + this.errMsg = errMsg; + } + + /** + * Returns the number of errors encountered. + * + * @return long + */ + public long getErrorCount() { + return errorCount; + } + + /** + * Incs Error Count. + */ + public void incErrorCount() { + this.errorCount++; + } + + /** + * Returns bytes copied so far. + * + * @return long + */ + public long getBytesCopied() { + return bytesCopied; + } + + /** + * Sets bytes copied so far. + * + * @param bytesCopied - long + */ + public void setBytesCopied(long bytesCopied) { + this.bytesCopied = bytesCopied; + } + + /** + * Increments bytesCopied by delta. + * + * @param delta - long + */ + public void incCopiedSoFar(long delta) { + this.bytesCopied += delta; + } + + /** + * Returns bytes to copy. + * + * @return - long + */ + public long getBytesToCopy() { + return bytesToCopy; + } + + /** + * Returns number of blocks copied for this DiskBalancerWorkItem. + * + * @return long count of blocks. + */ + public long getBlocksCopied() { + return blocksCopied; + } + + /** + * increments the number of blocks copied. + */ + public void incBlocksCopied() { + blocksCopied++; + } + + /** + * returns a serialized json string. + * + * @return String - json + * @throws IOException + */ + public String toJson() throws IOException { + ObjectMapper mapper = new ObjectMapper(); + return mapper.writeValueAsString(this); + } + +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/WorkStatus.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancerWorkStatus.java similarity index 82% rename from hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/WorkStatus.java rename to hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancerWorkStatus.java index 259a311f35..6b29ce8aca 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/WorkStatus.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancerWorkStatus.java @@ -20,16 +20,18 @@ package org.apache.hadoop.hdfs.server.datanode; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; /** * Helper class that reports how much work has has been done by the node. */ @InterfaceAudience.Private -public class WorkStatus { - private int result; - private String planID; - private String status; - private String currentState; +@InterfaceStability.Unstable +public class DiskBalancerWorkStatus { + private final int result; + private final String planID; + private final String status; + private final String currentState; /** * Constructs a workStatus Object. @@ -39,8 +41,8 @@ public class WorkStatus { * @param status - Current Status * @param currentState - Current State */ - public WorkStatus(int result, String planID, String status, - String currentState) { + public DiskBalancerWorkStatus(int result, String planID, String status, + String currentState) { this.result = result; this.planID = planID; this.status = status; diff --git a/hadoop-hdfs-project/hadoop-hdfs/HDFS-1312_CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/HDFS-1312_CHANGES.txt index d3bdedffc8..27de7d0924 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/HDFS-1312_CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/HDFS-1312_CHANGES.txt @@ -21,6 +21,9 @@ HDFS-1312 Change Log HDFS-9645. DiskBalancer: Add Query RPC. (Anu Engineer via Arpit Agarwal) - HDFS-9647. DiskBalancer: Add getRuntimeSettings. (Anu Engineer - via Arpit Agarwal) + HDFS-9647. DiskBalancer: Add getRuntimeSettings. (Anu Engineer via + Arpit Agarwal) + + HDFS-9671skBalancer: SubmitPlan implementation. (Anu Engineer via + Arpit Agarwal) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index f18a6c6537..224ab3d53b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -930,6 +930,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys { DFS_DATANODE_TRANSFER_SOCKET_RECV_BUFFER_SIZE_DEFAULT = HdfsConstants.DEFAULT_DATA_SOCKET_SIZE; + // Disk Balancer Keys + public static final String DFS_DISK_BALANCER_ENABLED = + "dfs.disk.balancer.enabled"; + public static final boolean DFS_DISK_BALANCER_ENABLED_DEFAULT = false; + // dfs.client.retry confs are moved to HdfsClientConfigKeys.Retry @Deprecated public static final String DFS_CLIENT_RETRY_POLICY_ENABLED_KEY diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java index 32466333b1..692fca3182 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java @@ -57,7 +57,7 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DiskBa import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DiskBalancerSettingResponseProto; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; -import org.apache.hadoop.hdfs.server.datanode.WorkStatus; +import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus; /** * Implementation for protobuf service that forwards requests @@ -293,7 +293,7 @@ public class ClientDatanodeProtocolServerSideTranslatorPB implements RpcController controller, QueryPlanStatusRequestProto request) throws ServiceException { try { - WorkStatus result = impl.queryDiskBalancerPlan(); + DiskBalancerWorkStatus result = impl.queryDiskBalancerPlan(); return QueryPlanStatusResponseProto .newBuilder() .setResult(result.getResult()) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 836dc811be..8d805a1c0e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -169,7 +169,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics; import org.apache.hadoop.hdfs.server.datanode.web.DatanodeHttpServer; -import org.apache.hadoop.hdfs.server.diskbalancer.DiskbalancerException; +import org.apache.hadoop.hdfs.server.diskbalancer.DiskBalancerException; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; @@ -386,6 +386,8 @@ public class DataNode extends ReconfigurableBase private static final int NUM_CORES = Runtime.getRuntime() .availableProcessors(); private static final double CONGESTION_RATIO = 1.5; + private DiskBalancer diskBalancer; + private static Tracer createTracer(Configuration conf) { return new Tracer.Builder("DataNode"). @@ -1022,7 +1024,33 @@ public class DataNode extends ReconfigurableBase directoryScanner.shutdown(); } } - + + /** + * Initilizes {@link DiskBalancer}. + * @param data - FSDataSet + * @param conf - Config + */ + private synchronized void initDiskBalancer(FsDatasetSpi data, + Configuration conf) { + if (this.diskBalancer != null) { + return; + } + + DiskBalancer.BlockMover mover = new DiskBalancer.DiskBalancerMover(data, + conf); + this.diskBalancer = new DiskBalancer(getDatanodeUuid(), conf, mover); + } + + /** + * Shutdown disk balancer. + */ + private synchronized void shutdownDiskBalancer() { + if (this.diskBalancer != null) { + this.diskBalancer.shutdown(); + this.diskBalancer = null; + } + } + private void initDataXceiver(Configuration conf) throws IOException { // find free port or use privileged port provided TcpPeerServer tcpPeerServer; @@ -1530,6 +1558,7 @@ public class DataNode extends ReconfigurableBase data.addBlockPool(nsInfo.getBlockPoolID(), conf); blockScanner.enableBlockPoolId(bpos.getBlockPoolId()); initDirectoryScanner(conf); + initDiskBalancer(data, conf); } List getAllBpOs() { @@ -1867,6 +1896,7 @@ public class DataNode extends ReconfigurableBase // Terminate directory scanner and block scanner shutdownPeriodicScanners(); + shutdownDiskBalancer(); // Stop the web server if (httpServer != null) { @@ -3296,31 +3326,30 @@ public class DataNode extends ReconfigurableBase * @param bandwidth - Max disk bandwidth to use, 0 means use value defined * in the configration. * @param plan - Actual plan - * @return success or throws an exception. - * @throws Exception + * @throws IOException */ @Override public void submitDiskBalancerPlan(String planID, long planVersion, long bandwidth, String plan) throws IOException { - // TODO : This will be replaced with actual code later. - // Right now throwing DiskbalancerException instead - // NotImplementedException to indicate the eventually disk balancer code - // will throw DiskbalancerException. - throw new DiskbalancerException("Not Implemented", 0); + checkSuperuserPrivilege(); + // TODO : Support force option + this.diskBalancer.submitPlan(planID, planVersion, plan, bandwidth, false); } @Override public void cancelDiskBalancePlan(String planID) throws IOException { checkSuperuserPrivilege(); - throw new DiskbalancerException("Not Implemented", 0); + throw new DiskBalancerException("Not Implemented", + DiskBalancerException.Result.INTERNAL_ERROR); } @Override - public WorkStatus queryDiskBalancerPlan() throws IOException { + public DiskBalancerWorkStatus queryDiskBalancerPlan() throws IOException { checkSuperuserPrivilege(); - throw new DiskbalancerException("Not Implemented", 0); + throw new DiskBalancerException("Not Implemented", + DiskBalancerException.Result.INTERNAL_ERROR); } /** @@ -3334,6 +3363,7 @@ public class DataNode extends ReconfigurableBase @Override public String getDiskBalancerSetting(String key) throws IOException { checkSuperuserPrivilege(); - throw new DiskbalancerException("Not Implemented", 0); + throw new DiskBalancerException("Not Implemented", + DiskBalancerException.Result.INTERNAL_ERROR); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java new file mode 100644 index 0000000000..1c8ba4cf0b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java @@ -0,0 +1,542 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.hadoop.hdfs.server.datanode; + +import com.google.common.base.Preconditions; +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; +import org.apache.hadoop.hdfs.server.diskbalancer.DiskBalancerConstants; +import org.apache.hadoop.hdfs.server.diskbalancer.DiskBalancerException; +import org.apache.hadoop.hdfs.server.diskbalancer.planner.NodePlan; +import org.apache.hadoop.hdfs.server.diskbalancer.planner.Step; +import org.apache.hadoop.util.Time; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.*; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Worker class for Disk Balancer. + *

+ * Here is the high level logic executed by this class. Users can submit disk + * balancing plans using submitPlan calls. After a set of sanity checks the plan + * is admitted and put into workMap. + *

+ * The executePlan launches a thread that picks up work from workMap and hands + * it over to the BlockMover#copyBlocks function. + *

+ * Constraints : + *

+ * Only one plan can be executing in a datanode at any given time. This is + * ensured by checking the future handle of the worker thread in submitPlan. + */ +@InterfaceAudience.Private +public class DiskBalancer { + + private static final Log LOG = LogFactory.getLog(DiskBalancer.class); + private final FsDatasetSpi dataset; + private final String dataNodeUUID; + private final BlockMover blockMover; + private final ReentrantLock lock; + private final ConcurrentHashMap workMap; + private boolean isDiskBalancerEnabled = false; + private ExecutorService scheduler; + private Future future; + private String planID; + + /** + * Constructs a Disk Balancer object. This object takes care of reading a + * NodePlan and executing it against a set of volumes. + * + * @param dataNodeUUID - Data node UUID + * @param conf - Hdfs Config + * @param blockMover - Object that supports moving blocks. + */ + public DiskBalancer(String dataNodeUUID, + Configuration conf, BlockMover blockMover) { + this.blockMover = blockMover; + this.dataset = this.blockMover.getDataset(); + this.dataNodeUUID = dataNodeUUID; + scheduler = Executors.newSingleThreadExecutor(); + lock = new ReentrantLock(); + workMap = new ConcurrentHashMap<>(); + this.isDiskBalancerEnabled = conf.getBoolean( + DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, + DFSConfigKeys.DFS_DISK_BALANCER_ENABLED_DEFAULT); + } + + /** + * Shutdown disk balancer services. + */ + public void shutdown() { + lock.lock(); + try { + this.isDiskBalancerEnabled = false; + if ((this.future != null) && (!this.future.isDone())) { + this.blockMover.setExitFlag(); + shutdownExecutor(); + } + } finally { + lock.unlock(); + } + } + + /** + * Shutdown the executor. + */ + private void shutdownExecutor() { + scheduler.shutdown(); + try { + if(!scheduler.awaitTermination(30, TimeUnit.SECONDS)) { + scheduler.shutdownNow(); + if (!scheduler.awaitTermination(30, TimeUnit.SECONDS)) { + LOG.error("Disk Balancer : Scheduler did not terminate."); + } + } + } catch (InterruptedException ex) { + scheduler.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + + /** + * Takes a client submitted plan and converts into a set of work items that + * can be executed by the blockMover. + * + * @param planID - A SHA512 of the plan string + * @param planVersion - version of the plan string - for future use. + * @param plan - Actual Plan + * @param bandwidth - BytesPerSec to copy + * @param force - Skip some validations and execute the plan file. + * @throws DiskBalancerException + */ + public void submitPlan(String planID, long planVersion, String plan, + long bandwidth, boolean force) + throws DiskBalancerException { + + lock.lock(); + try { + checkDiskBalancerEnabled(); + if ((this.future != null) && (!this.future.isDone())) { + LOG.error("Disk Balancer - Executing another plan, submitPlan failed."); + throw new DiskBalancerException("Executing another plan", + DiskBalancerException.Result.PLAN_ALREADY_IN_PROGRESS); + } + NodePlan nodePlan = + verifyPlan(planID, planVersion, plan, bandwidth, force); + createWorkPlan(nodePlan); + this.planID = planID; + executePlan(); + } finally { + lock.unlock(); + } + } + + /** + * Throws if Disk balancer is disabled. + * + * @throws DiskBalancerException + */ + private void checkDiskBalancerEnabled() + throws DiskBalancerException { + if (!isDiskBalancerEnabled) { + LOG.error("Disk Balancer is not enabled."); + throw new DiskBalancerException("Disk Balancer is not enabled.", + DiskBalancerException.Result.DISK_BALANCER_NOT_ENABLED); + } + } + + /** + * Verifies that user provided plan is valid. + * + * @param planID - SHA 512 of the plan. + * @param planVersion - Version of the plan, for future use. + * @param plan - Plan String in Json. + * @param bandwidth - Max disk bandwidth to use per second. + * @param force - Skip verifying when the plan was generated. + * @return a NodePlan Object. + * @throws DiskBalancerException + */ + private NodePlan verifyPlan(String planID, long planVersion, String plan, + long bandwidth, boolean force) + throws DiskBalancerException { + + Preconditions.checkState(lock.isHeldByCurrentThread()); + verifyPlanVersion(planVersion); + NodePlan nodePlan = verifyPlanHash(planID, plan); + if (!force) { + verifyTimeStamp(nodePlan); + } + verifyNodeUUID(nodePlan); + return nodePlan; + } + + /** + * Verifies the plan version is something that we support. + * + * @param planVersion - Long version. + * @throws DiskBalancerException + */ + private void verifyPlanVersion(long planVersion) + throws DiskBalancerException { + if ((planVersion < DiskBalancerConstants.DISKBALANCER_MIN_VERSION) || + (planVersion > DiskBalancerConstants.DISKBALANCER_MAX_VERSION)) { + LOG.error("Disk Balancer - Invalid plan version."); + throw new DiskBalancerException("Invalid plan version.", + DiskBalancerException.Result.INVALID_PLAN_VERSION); + } + } + + /** + * Verifies that plan matches the SHA512 provided by the client. + * + * @param planID - Sha512 Hex Bytes + * @param plan - Plan String + * @throws DiskBalancerException + */ + private NodePlan verifyPlanHash(String planID, String plan) + throws DiskBalancerException { + final long sha512Length = 128; + if (plan == null || plan.length() == 0) { + LOG.error("Disk Balancer - Invalid plan."); + throw new DiskBalancerException("Invalid plan.", + DiskBalancerException.Result.INVALID_PLAN); + } + + if ((planID == null) || + (planID.length() != sha512Length) || + !DigestUtils.sha512Hex(plan.getBytes(Charset.forName("UTF-8"))) + .equalsIgnoreCase(planID)) { + LOG.error("Disk Balancer - Invalid plan hash."); + throw new DiskBalancerException("Invalid or mis-matched hash.", + DiskBalancerException.Result.INVALID_PLAN_HASH); + } + + try { + return NodePlan.parseJson(plan); + } catch (IOException ex) { + throw new DiskBalancerException("Parsing plan failed.", ex, + DiskBalancerException.Result.MALFORMED_PLAN); + } + } + + /** + * Verifies that this plan is not older than 24 hours. + * + * @param plan - Node Plan + */ + private void verifyTimeStamp(NodePlan plan) throws DiskBalancerException { + long now = Time.now(); + long planTime = plan.getTimeStamp(); + + // TODO : Support Valid Plan hours as a user configurable option. + if ((planTime + + (TimeUnit.HOURS.toMillis( + DiskBalancerConstants.DISKBALANCER_VALID_PLAN_HOURS))) < now) { + String hourString = "Plan was generated more than " + + Integer.toString(DiskBalancerConstants.DISKBALANCER_VALID_PLAN_HOURS) + + " hours ago."; + LOG.error("Disk Balancer - " + hourString); + throw new DiskBalancerException(hourString, + DiskBalancerException.Result.OLD_PLAN_SUBMITTED); + } + } + + /** + * Verify Node UUID. + * + * @param plan - Node Plan + */ + private void verifyNodeUUID(NodePlan plan) throws DiskBalancerException { + if ((plan.getNodeUUID() == null) || + !plan.getNodeUUID().equals(this.dataNodeUUID)) { + LOG.error("Disk Balancer - Plan was generated for another node."); + throw new DiskBalancerException( + "Plan was generated for another node.", + DiskBalancerException.Result.DATANODE_ID_MISMATCH); + } + } + + /** + * Convert a node plan to DiskBalancerWorkItem that Datanode can execute. + * + * @param plan - Node Plan + */ + private void createWorkPlan(NodePlan plan) throws DiskBalancerException { + Preconditions.checkState(lock.isHeldByCurrentThread()); + + // Cleanup any residual work in the map. + workMap.clear(); + Map pathMap = getStorageIDToVolumeMap(); + + for (Step step : plan.getVolumeSetPlans()) { + String sourceuuid = step.getSourceVolume().getUuid(); + String destinationuuid = step.getDestinationVolume().getUuid(); + + FsVolumeSpi sourceVol = pathMap.get(sourceuuid); + if (sourceVol == null) { + LOG.error("Disk Balancer - Unable to find source volume. submitPlan " + + "failed."); + throw new DiskBalancerException("Unable to find source volume.", + DiskBalancerException.Result.INVALID_VOLUME); + } + + FsVolumeSpi destVol = pathMap.get(destinationuuid); + if (destVol == null) { + LOG.error("Disk Balancer - Unable to find destination volume. " + + "submitPlan failed."); + throw new DiskBalancerException("Unable to find destination volume.", + DiskBalancerException.Result.INVALID_VOLUME); + } + createWorkPlan(sourceVol, destVol, step.getBytesToMove()); + } + } + + /** + * Returns a path to Volume Map. + * + * @return Map + * @throws DiskBalancerException + */ + private Map getStorageIDToVolumeMap() + throws DiskBalancerException { + Map pathMap = new HashMap<>(); + FsDatasetSpi.FsVolumeReferences references; + try { + synchronized (this.dataset) { + references = this.dataset.getFsVolumeReferences(); + for (int ndx = 0; ndx < references.size(); ndx++) { + FsVolumeSpi vol = references.get(ndx); + pathMap.put(vol.getStorageID(), vol); + } + references.close(); + } + } catch (IOException ex) { + LOG.error("Disk Balancer - Internal Error.", ex); + throw new DiskBalancerException("Internal error", ex, + DiskBalancerException.Result.INTERNAL_ERROR); + } + return pathMap; + } + + /** + * Starts Executing the plan, exits when the plan is done executing. + */ + private void executePlan() { + Preconditions.checkState(lock.isHeldByCurrentThread()); + this.blockMover.setRunnable(); + if (this.scheduler.isShutdown()) { + this.scheduler = Executors.newSingleThreadExecutor(); + } + + this.future = scheduler.submit(new Runnable() { + @Override + public void run() { + Thread.currentThread().setName("DiskBalancerThread"); + LOG.info("Executing Disk balancer plan. Plan ID - " + planID); + + for (Map.Entry entry : + workMap.entrySet()) { + blockMover.copyBlocks(entry.getKey(), entry.getValue()); + } + } + }); + } + + /** + * Insert work items to work map. + * + * @param source - Source vol + * @param dest - destination volume + * @param bytesToMove - number of bytes to move + */ + private void createWorkPlan(FsVolumeSpi source, FsVolumeSpi dest, + long bytesToMove) throws DiskBalancerException { + + if(source.getStorageID().equals(dest.getStorageID())) { + throw new DiskBalancerException("Same source and destination", + DiskBalancerException.Result.INVALID_MOVE); + } + VolumePair pair = new VolumePair(source, dest); + + // In case we have a plan with more than + // one line of same + // we compress that into one work order. + if (workMap.containsKey(pair)) { + bytesToMove += workMap.get(pair).getBytesToCopy(); + } + + DiskBalancerWorkItem work = new DiskBalancerWorkItem(bytesToMove, 0); + workMap.put(pair, work); + } + + /** + * BlockMover supports moving blocks across Volumes. + */ + public interface BlockMover { + /** + * Copies blocks from a set of volumes. + * + * @param pair - Source and Destination Volumes. + * @param item - Number of bytes to move from volumes. + */ + void copyBlocks(VolumePair pair, DiskBalancerWorkItem item); + + /** + * Begin the actual copy operations. This is useful in testing. + */ + void setRunnable(); + + /** + * Tells copyBlocks to exit from the copy routine. + */ + void setExitFlag(); + + /** + * Returns a pointer to the current dataset we are operating against. + * + * @return FsDatasetSpi + */ + FsDatasetSpi getDataset(); + } + + /** + * Holds references to actual volumes that we will be operating against. + */ + static class VolumePair { + private final FsVolumeSpi source; + private final FsVolumeSpi dest; + + /** + * Constructs a volume pair. + * + * @param source - Source Volume + * @param dest - Destination Volume + */ + public VolumePair(FsVolumeSpi source, FsVolumeSpi dest) { + this.source = source; + this.dest = dest; + } + + /** + * gets source volume. + * + * @return volume + */ + public FsVolumeSpi getSource() { + return source; + } + + /** + * Gets Destination volume. + * + * @return volume. + */ + public FsVolumeSpi getDest() { + return dest; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + VolumePair that = (VolumePair) o; + return source.equals(that.source) && dest.equals(that.dest); + } + + @Override + public int hashCode() { + int result = source.getBasePath().hashCode(); + result = 31 * result + dest.getBasePath().hashCode(); + return result; + } + } + + /** + * Actual DataMover class for DiskBalancer. + *

+ * TODO : Add implementation for this class. This is here as a place holder so + * that Datanode can make calls into this class. + */ + public static class DiskBalancerMover implements BlockMover { + private final FsDatasetSpi dataset; + + /** + * Constructs diskBalancerMover. + * + * @param dataset Dataset + * @param conf Configuration + */ + public DiskBalancerMover(FsDatasetSpi dataset, Configuration conf) { + this.dataset = dataset; + // TODO : Read Config values. + } + + /** + * Copies blocks from a set of volumes. + * + * @param pair - Source and Destination Volumes. + * @param item - Number of bytes to move from volumes. + */ + @Override + public void copyBlocks(VolumePair pair, DiskBalancerWorkItem item) { + + } + + /** + * Begin the actual copy operations. This is useful in testing. + */ + @Override + public void setRunnable() { + + } + + /** + * Tells copyBlocks to exit from the copy routine. + */ + @Override + public void setExitFlag() { + + } + + /** + * Returns a pointer to the current dataset we are operating against. + * + * @return FsDatasetSpi + */ + @Override + public FsDatasetSpi getDataset() { + return dataset; + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerConstants.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerConstants.java index 553827ef15..7144a50151 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerConstants.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerConstants.java @@ -29,6 +29,15 @@ public final class DiskBalancerConstants { public static final String DISKBALANCER_VOLUME_NAME = "DiskBalancerVolumeName"; + /** Min and Max Plan file versions that we know of. **/ + public static final int DISKBALANCER_MIN_VERSION = 1; + public static final int DISKBALANCER_MAX_VERSION = 1; + + /** + * We treat a plan as stale if it was generated before the hours + * defined by the constant below. Defaults to 24 hours. + */ + public static final int DISKBALANCER_VALID_PLAN_HOURS = 24; // never constructed. private DiskBalancerConstants() { } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskbalancerException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerException.java similarity index 54% rename from hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskbalancerException.java rename to hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerException.java index 9d47dc33c8..a5e158151a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskbalancerException.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerException.java @@ -1,18 +1,19 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.hadoop.hdfs.server.diskbalancer; @@ -21,17 +22,31 @@ import java.io.IOException; /** * Disk Balancer Exceptions. */ -public class DiskbalancerException extends IOException { - private int result; +public class DiskBalancerException extends IOException { + /** Possible results from DiskBalancer. **/ + public enum Result { + DISK_BALANCER_NOT_ENABLED, + INVALID_PLAN_VERSION, + INVALID_PLAN, + INVALID_PLAN_HASH, + OLD_PLAN_SUBMITTED, + DATANODE_ID_MISMATCH, + MALFORMED_PLAN, + PLAN_ALREADY_IN_PROGRESS, + INVALID_VOLUME, + INVALID_MOVE, + INTERNAL_ERROR + } + + private final Result result; /** * Constructs an {@code IOException} with the specified detail message. * * @param message The detail message (which is saved for later retrieval by - * the - * {@link #getMessage()} method) + * the {@link #getMessage()} method) */ - public DiskbalancerException(String message, int result) { + public DiskBalancerException(String message, Result result) { super(message); this.result = result; } @@ -50,9 +65,8 @@ public class DiskbalancerException extends IOException { * @param cause The cause (which is saved for later retrieval by the {@link * #getCause()} method). (A null value is permitted, and * indicates that the cause is nonexistent or unknown.) - * @since 1.6 */ - public DiskbalancerException(String message, Throwable cause, int result) { + public DiskBalancerException(String message, Throwable cause, Result result) { super(message, cause); this.result = result; } @@ -61,17 +75,15 @@ public class DiskbalancerException extends IOException { * Constructs an {@code IOException} with the specified cause and a detail * message of {@code (cause==null ? null : cause.toString())} (which typically * contains the class and detail message of {@code cause}). This - * constructor is - * useful for IO exceptions that are little more than wrappers for other - * throwables. + * constructor is useful for IO exceptions that are little more than + * wrappers for other throwables. * * @param cause The cause (which is saved for later retrieval by the {@link * #getCause()} method). (A null value is permitted, and * indicates * that the cause is nonexistent or unknown.) - * @since 1.6 */ - public DiskbalancerException(Throwable cause, int result) { + public DiskBalancerException(Throwable cause, Result result) { super(cause); this.result = result; } @@ -80,7 +92,7 @@ public class DiskbalancerException extends IOException { * Returns the result. * @return int */ - public int getResult() { + public Result getResult() { return result; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerCluster.java index af9e9afbf8..c86fc9af8f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerCluster.java @@ -358,4 +358,18 @@ public class DiskBalancerCluster { return (10 - modValue) + threadRatio; } } + + /** + * Returns a node by UUID. + * @param uuid - Node's UUID + * @return DiskBalancerDataNode. + */ + public DiskBalancerDataNode getNodeByUUID(String uuid) { + for(DiskBalancerDataNode node : this.getNodes()) { + if(node.getDataNodeUUID().equals(uuid)) { + return node; + } + } + return null; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java index 143b776a1a..dc24787de3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.diskbalancer; import org.apache.commons.codec.digest.DigestUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.server.datanode.DataNode; @@ -35,9 +36,6 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; -import java.io.IOException; -import java.net.URI; - public class TestDiskBalancerRPC { @Rule public ExpectedException thrown = ExpectedException.none(); @@ -48,6 +46,7 @@ public class TestDiskBalancerRPC { @Before public void setUp() throws Exception { conf = new HdfsConfiguration(); + conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build(); cluster.waitActive(); } @@ -72,22 +71,19 @@ public class TestDiskBalancerRPC { Assert.assertEquals(cluster.getDataNodes().size(), diskBalancerCluster.getNodes().size()); diskBalancerCluster.setNodesToProcess(diskBalancerCluster.getNodes()); - DiskBalancerDataNode node = diskBalancerCluster.getNodes().get(dnIndex); + + DataNode dataNode = cluster.getDataNodes().get(dnIndex); + DiskBalancerDataNode node = diskBalancerCluster.getNodeByUUID( + dataNode.getDatanodeUuid()); GreedyPlanner planner = new GreedyPlanner(10.0f, node); NodePlan plan = new NodePlan(node.getDataNodeName(), node.getDataNodePort ()); planner.balanceVolumeSet(node, node.getVolumeSets().get("DISK"), plan); - final int planVersion = 0; // So far we support only one version. - DataNode dataNode = cluster.getDataNodes().get(dnIndex); + final int planVersion = 1; // So far we support only one version. String planHash = DigestUtils.sha512Hex(plan.toJson()); - // Since submitDiskBalancerPlan is not implemented yet, it throws an - // Exception, this will be modified with the actual implementation. - thrown.expect(DiskbalancerException.class); dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson()); - - } @Test @@ -117,10 +113,10 @@ public class TestDiskBalancerRPC { // Exception, this will be modified with the actual implementation. try { dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson()); - } catch (DiskbalancerException ex) { + } catch (DiskBalancerException ex) { // Let us ignore this for time being. } - thrown.expect(DiskbalancerException.class); + thrown.expect(DiskBalancerException.class); dataNode.cancelDiskBalancePlan(planHash); } @@ -152,13 +148,13 @@ public class TestDiskBalancerRPC { // Exception, this will be modified with the actual implementation. try { dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson()); - } catch (DiskbalancerException ex) { + } catch (DiskBalancerException ex) { // Let us ignore this for time being. } // TODO : This will be fixed when we have implementation for this // function in server side. - thrown.expect(DiskbalancerException.class); + thrown.expect(DiskBalancerException.class); dataNode.queryDiskBalancerPlan(); } @@ -166,7 +162,7 @@ public class TestDiskBalancerRPC { public void testgetDiskBalancerSetting() throws Exception { final int dnIndex = 0; DataNode dataNode = cluster.getDataNodes().get(dnIndex); - thrown.expect(DiskbalancerException.class); + thrown.expect(DiskBalancerException.class); dataNode.getDiskBalancerSetting( DiskBalancerConstants.DISKBALANCER_BANDWIDTH); }