From c3b235e56597d55387b4003e376faee10b473d55 Mon Sep 17 00:00:00 2001 From: Arpit Agarwal Date: Wed, 28 Sep 2016 11:47:37 -0700 Subject: [PATCH] HDFS-10824. MiniDFSCluster#storageCapacities has no effects on real capacity. Contributed by Xiaobing Zhou. --- .../apache/hadoop/hdfs/MiniDFSCluster.java | 105 ++++++++++++---- .../hadoop/hdfs/TestMiniDFSCluster.java | 119 ++++++++++++++++++ 2 files changed, 200 insertions(+), 24 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java index 3bb3a10fe5..cf02a8de84 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java @@ -56,6 +56,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.Map; @@ -547,6 +548,8 @@ public void setDnArgs(String ... args) { protected final int storagesPerDatanode; private Set fileSystems = Sets.newHashSet(); + private List storageCap = Lists.newLinkedList(); + /** * A unique instance identifier for the cluster. This * is used to disambiguate HA filesystems in the case where @@ -1648,31 +1651,64 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes, } this.numDataNodes += numDataNodes; waitActive(); - - if (storageCapacities != null) { - for (int i = curDatanodesNum; i < curDatanodesNum+numDataNodes; ++i) { - final int index = i - curDatanodesNum; - try (FsDatasetSpi.FsVolumeReferences volumes = - dns[index].getFSDataset().getFsVolumeReferences()) { - assert storageCapacities[index].length == storagesPerDatanode; - assert volumes.size() == storagesPerDatanode; - int j = 0; - for (FsVolumeSpi fvs : volumes) { - FsVolumeImpl volume = (FsVolumeImpl) fvs; - LOG.info("setCapacityForTesting " + storageCapacities[index][j] - + " for [" + volume.getStorageType() + "]" + volume - .getStorageID()); - volume.setCapacityForTesting(storageCapacities[index][j]); - j++; - } - } + setDataNodeStorageCapacities( + curDatanodesNum, + numDataNodes, + dns, + storageCapacities); + + /* memorize storage capacities */ + if (storageCapacities != null) { + storageCap.addAll(Arrays.asList(storageCapacities)); + } + } + + private synchronized void setDataNodeStorageCapacities( + final int curDatanodesNum, + final int numDNs, + final DataNode[] dns, + long[][] storageCapacities) throws IOException { + if (storageCapacities != null) { + for (int i = curDatanodesNum; i < curDatanodesNum + numDNs; ++i) { + final int index = i - curDatanodesNum; + setDataNodeStorageCapacities(index, dns[index], storageCapacities); } } } - - - + + private synchronized void setDataNodeStorageCapacities( + final int curDnIdx, + final DataNode curDn, + long[][] storageCapacities) throws IOException { + + if (storageCapacities == null || storageCapacities.length == 0) { + return; + } + + try { + waitDataNodeFullyStarted(curDn); + } catch (TimeoutException | InterruptedException e) { + throw new IOException(e); + } + + try (FsDatasetSpi.FsVolumeReferences volumes = curDn.getFSDataset() + .getFsVolumeReferences()) { + assert storageCapacities[curDnIdx].length == storagesPerDatanode; + assert volumes.size() == storagesPerDatanode; + + int j = 0; + for (FsVolumeSpi fvs : volumes) { + FsVolumeImpl volume = (FsVolumeImpl) fvs; + LOG.info("setCapacityForTesting " + storageCapacities[curDnIdx][j] + + " for [" + volume.getStorageType() + "]" + volume.getStorageID()); + volume.setCapacityForTesting(storageCapacities[curDnIdx][j]); + j++; + } + } + DataNodeTestUtils.triggerHeartbeat(curDn); + } + /** * Modify the config and start up the DataNodes. The info port for * DataNodes is guaranteed to use a free port. @@ -2236,6 +2272,16 @@ public boolean restartDataNode(DataNodeProperties dnprop) throws IOException { return restartDataNode(dnprop, false); } + private void waitDataNodeFullyStarted(final DataNode dn) + throws TimeoutException, InterruptedException { + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + return dn.isDatanodeFullyStarted(); + } + }, 100, 60000); + } + /** * Restart a datanode, on the same port if requested * @param dnprop the datanode to restart @@ -2256,10 +2302,21 @@ public synchronized boolean restartDataNode(DataNodeProperties dnprop, conf.set(DFS_DATANODE_IPC_ADDRESS_KEY, addr.getAddress().getHostAddress() + ":" + dnprop.ipcPort); } - DataNode newDn = DataNode.createDataNode(args, conf, secureResources); - dataNodes.add(new DataNodeProperties( - newDn, newconf, args, secureResources, newDn.getIpcPort())); + final DataNode newDn = DataNode.createDataNode(args, conf, secureResources); + + final DataNodeProperties dnp = new DataNodeProperties( + newDn, + newconf, + args, + secureResources, + newDn.getIpcPort()); + dataNodes.add(dnp); numDataNodes++; + + setDataNodeStorageCapacities( + dataNodes.lastIndexOf(dnp), + newDn, + storageCap.toArray(new long[][]{})); return true; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMiniDFSCluster.java index 4d027dcfd9..3d4cc72859 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMiniDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMiniDFSCluster.java @@ -25,16 +25,25 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.TimeoutException; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.MiniDFSCluster.NameNodeInfo; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl; import org.apache.hadoop.test.PathUtils; import org.junit.Before; import org.junit.Test; +import com.google.common.base.Preconditions; + /** * Tests MiniDFS cluster setup/teardown and isolation. * Every instance is brought up with a new data dir, to ensure that @@ -78,6 +87,116 @@ public void testClusterWithoutSystemProperties() throws Throwable { } } + /** + * Tests storage capacity setting still effective after cluster restart. + */ + @Test(timeout=100000) + public void testClusterSetStorageCapacity() throws Throwable { + + final Configuration conf = new HdfsConfiguration(); + final int numDatanodes = 1; + final int defaultBlockSize = 1024; + final int blocks = 100; + final int blocksSize = 1024; + final int fileLen = blocks * blocksSize; + final long capcacity = defaultBlockSize * 2 * fileLen; + final long[] capacities = new long[] {capcacity, 2 * capcacity}; + + final MiniDFSCluster cluster = newCluster( + conf, + numDatanodes, + capacities, + defaultBlockSize, + fileLen); + verifyStorageCapacity(cluster, capacities); + + /* restart all data nodes */ + cluster.restartDataNodes(); + cluster.waitActive(); + verifyStorageCapacity(cluster, capacities); + + /* restart all name nodes */ + cluster.restartNameNodes(); + cluster.waitActive(); + verifyStorageCapacity(cluster, capacities); + + /* restart all name nodes firstly and data nodes then */ + cluster.restartNameNodes(); + cluster.restartDataNodes(); + cluster.waitActive(); + verifyStorageCapacity(cluster, capacities); + + /* restart all data nodes firstly and name nodes then */ + cluster.restartDataNodes(); + cluster.restartNameNodes(); + cluster.waitActive(); + verifyStorageCapacity(cluster, capacities); + } + + private void verifyStorageCapacity( + final MiniDFSCluster cluster, + final long[] capacities) throws IOException { + + FsVolumeImpl source = null; + FsVolumeImpl dest = null; + + /* verify capacity */ + for (int i = 0; i < cluster.getDataNodes().size(); i++) { + final DataNode dnNode = cluster.getDataNodes().get(i); + try (FsDatasetSpi.FsVolumeReferences refs = dnNode.getFSDataset() + .getFsVolumeReferences()) { + source = (FsVolumeImpl) refs.get(0); + dest = (FsVolumeImpl) refs.get(1); + assertEquals(capacities[0], source.getCapacity()); + assertEquals(capacities[1], dest.getCapacity()); + } + } + } + + private MiniDFSCluster newCluster( + final Configuration conf, + final int numDatanodes, + final long[] storageCapacities, + final int defaultBlockSize, + final int fileLen) + throws IOException, InterruptedException, TimeoutException { + + conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, defaultBlockSize); + conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, defaultBlockSize); + conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L); + + final String fileName = "/" + UUID.randomUUID().toString(); + final Path filePath = new Path(fileName); + + Preconditions.checkNotNull(storageCapacities); + Preconditions.checkArgument( + storageCapacities.length == 2, + "need to specify capacities for two storages."); + + /* Write a file and restart the cluster */ + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(numDatanodes) + .storageCapacities(storageCapacities) + .storageTypes(new StorageType[]{StorageType.DISK, StorageType.DISK}) + .storagesPerDatanode(2) + .build(); + cluster.waitActive(); + + final short replicationFactor = (short) 1; + final Random r = new Random(); + FileSystem fs = cluster.getFileSystem(0); + DFSTestUtil.createFile( + fs, + filePath, + fileLen, + replicationFactor, + r.nextLong()); + DFSTestUtil.waitReplication(fs, filePath, replicationFactor); + + return cluster; + } + @Test(timeout=100000) public void testIsClusterUpAfterShutdown() throws Throwable { Configuration conf = new HdfsConfiguration();