From 30c6ebd69919a477a582e599fb253ffe5c2982e1 Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Tue, 1 Dec 2015 14:43:06 -0800 Subject: [PATCH] HDFS-9449. DiskBalancer: Add connectors. Contributed by Anu Engineer --- .../hadoop-hdfs/HDFS-1312_CHANGES.txt | 2 + .../connectors/ConnectorFactory.java | 54 ++++++ .../connectors/DBNameNodeConnector.java | 162 ++++++++++++++++++ .../connectors/JsonNodeConnector.java | 77 +++++++++ .../diskbalancer/connectors/package-info.java | 10 +- .../server/diskbalancer/TestConnectors.java | 82 +++++++++ 6 files changed, 386 insertions(+), 1 deletion(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/connectors/ConnectorFactory.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/connectors/DBNameNodeConnector.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/connectors/JsonNodeConnector.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestConnectors.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/HDFS-1312_CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/HDFS-1312_CHANGES.txt index 5a7103276f..cad8e490b5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/HDFS-1312_CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/HDFS-1312_CHANGES.txt @@ -3,3 +3,5 @@ HDFS-1312 Change Log NEW FEATURES HDFS-9420. Add DataModels for DiskBalancer. (Anu Engineer via szetszwo) + + HDFS-9449. DiskBalancer: Add connectors. (Anu Engineer via szetszwo) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/connectors/ConnectorFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/connectors/ConnectorFactory.java new file mode 100644 index 0000000000..040923acf2 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/connectors/ConnectorFactory.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hdfs.server.diskbalancer.connectors; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; + +/** + * Connector factory creates appropriate connector based on the URL. + */ +public final class ConnectorFactory { + static final Log LOG = LogFactory.getLog(ConnectorFactory.class); + + /** + * Constructs an appropriate connector based on the URL. + * @param clusterURI - URL + * @return ClusterConnector + */ + public static ClusterConnector getCluster(URI clusterURI, Configuration + conf) throws IOException, URISyntaxException { + LOG.info("Cluster URI : " + clusterURI); + LOG.info("scheme : " + clusterURI.getScheme()); + if (clusterURI.getScheme().startsWith("file")) { + LOG.info("Creating a JsonNodeConnector"); + return new JsonNodeConnector(clusterURI.toURL()); + } else { + LOG.info("Creating NameNode connector"); + return new DBNameNodeConnector(clusterURI, conf); + } + } + + private ConnectorFactory() { + // never constructed + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/connectors/DBNameNodeConnector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/connectors/DBNameNodeConnector.java new file mode 100644 index 0000000000..c35e934676 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/connectors/DBNameNodeConnector.java @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdfs.server.diskbalancer.connectors; + +import com.google.common.base.Preconditions; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector; +import org.apache.hadoop.hdfs.server.diskbalancer.datamodel + .DiskBalancerDataNode; +import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerVolume; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; +import org.apache.hadoop.hdfs.server.protocol.StorageReport; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.LinkedList; +import java.util.List; + +/** + * DBNameNodeConnector connects to Namenode and extracts information from a + * given cluster. + */ +class DBNameNodeConnector implements ClusterConnector { + static final Log LOG = LogFactory.getLog(DBNameNodeConnector.class); + static final Path DISKBALANCER_ID_PATH = new Path("/system/diskbalancer.id"); + private final URI clusterURI; + private final NameNodeConnector connector; + + /** + * Constructs a DBNameNodeConnector. + * + * @param clusterURI - URL to connect to. + */ + public DBNameNodeConnector(URI clusterURI, Configuration conf) throws + IOException, URISyntaxException { + + // we don't care how many instances of disk balancers run. + // The admission is controlled at the data node, where we will + // execute only one plan at a given time. + NameNodeConnector.setWrite2IdFile(false); + + try { + connector = new NameNodeConnector("DiskBalancer", + clusterURI, DISKBALANCER_ID_PATH, null, conf, 1); + } catch (IOException ex) { + LOG.error("Unable to connect to NameNode " + ex.toString()); + throw ex; + } + + this.clusterURI = clusterURI; + } + + /** + * getNodes function returns a list of DiskBalancerDataNodes. + * + * @return Array of DiskBalancerDataNodes + */ + @Override + public List getNodes() throws Exception { + Preconditions.checkNotNull(this.connector); + List nodeList = new LinkedList<>(); + DatanodeStorageReport[] reports = this.connector + .getLiveDatanodeStorageReport(); + + for (DatanodeStorageReport report : reports) { + DiskBalancerDataNode datanode = getBalancerNodeFromDataNode( + report.getDatanodeInfo()); + getVolumeInfoFromStorageReports(datanode, report.getStorageReports()); + nodeList.add(datanode); + } + return nodeList; + } + + /** + * Returns info about the connector. + * + * @return String. + */ + @Override + public String getConnectorInfo() { + return "Name Node Connector : " + clusterURI.toString(); + } + + /** + * This function maps the required fields from DataNodeInfo to disk + * BalancerDataNode. + * + * @param nodeInfo + * @return DiskBalancerDataNode + */ + private DiskBalancerDataNode + getBalancerNodeFromDataNode(DatanodeInfo nodeInfo) { + Preconditions.checkNotNull(nodeInfo); + DiskBalancerDataNode dbDataNode = new DiskBalancerDataNode(nodeInfo + .getDatanodeUuid()); + dbDataNode.setDataNodeIP(nodeInfo.getIpAddr()); + dbDataNode.setDataNodeName(nodeInfo.getHostName()); + dbDataNode.setDataNodePort(nodeInfo.getIpcPort()); + return dbDataNode; + } + + /** + * Reads the relevant fields from each storage volume and populate the + * DiskBalancer Node. + * + * @param node - Disk Balancer Node + * @param reports - Array of StorageReport + */ + private void getVolumeInfoFromStorageReports(DiskBalancerDataNode node, + StorageReport[] reports) + throws Exception { + Preconditions.checkNotNull(node); + Preconditions.checkNotNull(reports); + for (StorageReport report : reports) { + DatanodeStorage storage = report.getStorage(); + DiskBalancerVolume volume = new DiskBalancerVolume(); + volume.setCapacity(report.getCapacity()); + volume.setFailed(report.isFailed()); + volume.setUsed(report.getDfsUsed()); + + // TODO : Should we do BlockPool level balancing at all ? + // Does it make sense ? Balancer does do that. Right now + // we only deal with volumes and not blockPools + + volume.setUsed(report.getDfsUsed()); + + volume.setUuid(storage.getStorageID()); + + // we will skip this volume for disk balancer if + // it is read-only since we will not be able to delete + // or if it is already failed. + volume.setSkip((storage.getState() == DatanodeStorage.State + .READ_ONLY_SHARED) || report.isFailed()); + volume.setStorageType(storage.getStorageType().name()); + volume.setIsTransient(storage.getStorageType().isTransient()); + //volume.setPath(storage.getVolumePath()); + node.addVolume(volume); + } + + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/connectors/JsonNodeConnector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/connectors/JsonNodeConnector.java new file mode 100644 index 0000000000..bf5aebb036 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/connectors/JsonNodeConnector.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdfs.server.diskbalancer.connectors; + +import com.google.common.base.Preconditions; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerCluster; +import org.apache.hadoop.hdfs.server.diskbalancer.datamodel + .DiskBalancerDataNode; +import org.codehaus.jackson.map.ObjectMapper; + +import java.io.File; +import java.net.URL; +import java.util.List; + +/** + * A connector that understands JSON data cluster models. + */ +public class JsonNodeConnector implements ClusterConnector { + static final Log LOG = LogFactory.getLog(JsonNodeConnector.class); + private final URL clusterURI; + + /** + * Constructs a JsonNodeConnector. + * @param clusterURI - A file URL that contains cluster information. + */ + public JsonNodeConnector(URL clusterURI) { + this.clusterURI = clusterURI; + } + + /** + * getNodes function connects to a cluster definition file + * and returns nodes defined in that file. + * + * @return Array of DiskBalancerDataNodes + */ + @Override + public List getNodes() throws Exception { + Preconditions.checkNotNull(this.clusterURI); + String dataFilePath = this.clusterURI.getPath(); + LOG.info("Reading cluster info from file : " + dataFilePath); + ObjectMapper mapper = new ObjectMapper(); + DiskBalancerCluster cluster = + mapper.readValue(new File(dataFilePath), DiskBalancerCluster.class); + String message = String.format("Found %d node(s)", + cluster.getNodes().size()); + LOG.info(message); + return cluster.getNodes(); + } + + /** + * Returns info about the connector. + * + * @return String. + */ + @Override + public String getConnectorInfo() { + return "Json Cluster Connector : Connects to a JSON file that describes a" + + " cluster : " + clusterURI.toString(); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/connectors/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/connectors/package-info.java index 8164804c33..b4b4437aea 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/connectors/package-info.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/connectors/package-info.java @@ -21,7 +21,15 @@ package org.apache.hadoop.hdfs.server.diskbalancer.connectors; * Connectors package is a set of logical connectors that connect * to various data sources to read the hadoop cluster information. * - * We currently have 1 connector in this package. it is + * We currently have 3 connectors in this package. They are + * + * DBNameNodeConnector - This uses the connector from the original + * balancer package to connect to a real hadoop cluster. + * + * JsonNodeConnector - This connects to a file and reads the data about a + * cluster. We can generate a cluster json from a real cluster using + * the diskBalancer tool or hand-craft it. There are some sample Json files + * checked in under test/resources/diskBalancer directory. * * NullConnector - This is an in-memory connector that is useful in testing. * we can crate dataNodes on the fly and attach to this connector and diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestConnectors.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestConnectors.java new file mode 100644 index 0000000000..bec488ef97 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestConnectors.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdfs.server.diskbalancer; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ClusterConnector; +import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ConnectorFactory; +import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerCluster; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; + +public class TestConnectors { + private MiniDFSCluster cluster; + final int numDatanodes = 3; + final int volumeCount = 2; // default volumes in MiniDFSCluster. + Configuration conf; + + @Before + public void setup() throws IOException { + conf = new HdfsConfiguration(); + cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(numDatanodes).build(); + } + + @After + public void teardown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test + public void TestNameNodeConnector() throws Exception { + cluster.waitActive(); + ClusterConnector nameNodeConnector = + ConnectorFactory.getCluster(cluster.getFileSystem(0).getUri(), conf); + DiskBalancerCluster diskBalancerCluster = new DiskBalancerCluster + (nameNodeConnector); + diskBalancerCluster.readClusterInfo(); + Assert.assertEquals("Expected number of Datanodes not found.", + numDatanodes, diskBalancerCluster.getNodes().size()); + Assert.assertEquals("Expected number of volumes not found.", + volumeCount, diskBalancerCluster.getNodes().get(0).getVolumeCount()); + } + + @Test + public void TestJsonConnector() throws Exception { + cluster.waitActive(); + ClusterConnector nameNodeConnector = + ConnectorFactory.getCluster(cluster.getFileSystem(0).getUri(), conf); + DiskBalancerCluster diskBalancerCluster = new DiskBalancerCluster + (nameNodeConnector); + diskBalancerCluster.readClusterInfo(); + String diskBalancerJson = diskBalancerCluster.toJson(); + DiskBalancerCluster serializedCluster = DiskBalancerCluster.parseJson + (diskBalancerJson); + Assert.assertEquals("Parsed cluster is not equal to persisted info.", + diskBalancerCluster.getNodes().size(), serializedCluster.getNodes() + .size()); + } +}