HDFS-10824. MiniDFSCluster#storageCapacities has no effects on real capacity. Contributed by Xiaobing Zhou.

This commit is contained in:
Arpit Agarwal 2016-09-28 11:47:37 -07:00
parent e19b37ead2
commit c3b235e565
2 changed files with 200 additions and 24 deletions

View File

@ -56,6 +56,7 @@
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -547,6 +548,8 @@ public void setDnArgs(String ... args) {
protected final int storagesPerDatanode; protected final int storagesPerDatanode;
private Set<FileSystem> fileSystems = Sets.newHashSet(); private Set<FileSystem> fileSystems = Sets.newHashSet();
private List<long[]> storageCap = Lists.newLinkedList();
/** /**
* A unique instance identifier for the cluster. This * A unique instance identifier for the cluster. This
* is used to disambiguate HA filesystems in the case where * is used to disambiguate HA filesystems in the case where
@ -1649,29 +1652,62 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes,
this.numDataNodes += numDataNodes; this.numDataNodes += numDataNodes;
waitActive(); waitActive();
setDataNodeStorageCapacities(
curDatanodesNum,
numDataNodes,
dns,
storageCapacities);
/* memorize storage capacities */
if (storageCapacities != null) { if (storageCapacities != null) {
for (int i = curDatanodesNum; i < curDatanodesNum+numDataNodes; ++i) { storageCap.addAll(Arrays.asList(storageCapacities));
}
}
private synchronized void setDataNodeStorageCapacities(
final int curDatanodesNum,
final int numDNs,
final DataNode[] dns,
long[][] storageCapacities) throws IOException {
if (storageCapacities != null) {
for (int i = curDatanodesNum; i < curDatanodesNum + numDNs; ++i) {
final int index = i - curDatanodesNum; final int index = i - curDatanodesNum;
try (FsDatasetSpi.FsVolumeReferences volumes = setDataNodeStorageCapacities(index, dns[index], storageCapacities);
dns[index].getFSDataset().getFsVolumeReferences()) { }
assert storageCapacities[index].length == storagesPerDatanode; }
}
private synchronized void setDataNodeStorageCapacities(
final int curDnIdx,
final DataNode curDn,
long[][] storageCapacities) throws IOException {
if (storageCapacities == null || storageCapacities.length == 0) {
return;
}
try {
waitDataNodeFullyStarted(curDn);
} catch (TimeoutException | InterruptedException e) {
throw new IOException(e);
}
try (FsDatasetSpi.FsVolumeReferences volumes = curDn.getFSDataset()
.getFsVolumeReferences()) {
assert storageCapacities[curDnIdx].length == storagesPerDatanode;
assert volumes.size() == storagesPerDatanode; assert volumes.size() == storagesPerDatanode;
int j = 0; int j = 0;
for (FsVolumeSpi fvs : volumes) { for (FsVolumeSpi fvs : volumes) {
FsVolumeImpl volume = (FsVolumeImpl) fvs; FsVolumeImpl volume = (FsVolumeImpl) fvs;
LOG.info("setCapacityForTesting " + storageCapacities[index][j] LOG.info("setCapacityForTesting " + storageCapacities[curDnIdx][j]
+ " for [" + volume.getStorageType() + "]" + volume + " for [" + volume.getStorageType() + "]" + volume.getStorageID());
.getStorageID()); volume.setCapacityForTesting(storageCapacities[curDnIdx][j]);
volume.setCapacityForTesting(storageCapacities[index][j]);
j++; j++;
} }
} }
DataNodeTestUtils.triggerHeartbeat(curDn);
} }
}
}
/** /**
* Modify the config and start up the DataNodes. The info port for * Modify the config and start up the DataNodes. The info port for
@ -2236,6 +2272,16 @@ public boolean restartDataNode(DataNodeProperties dnprop) throws IOException {
return restartDataNode(dnprop, false); return restartDataNode(dnprop, false);
} }
private void waitDataNodeFullyStarted(final DataNode dn)
throws TimeoutException, InterruptedException {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return dn.isDatanodeFullyStarted();
}
}, 100, 60000);
}
/** /**
* Restart a datanode, on the same port if requested * Restart a datanode, on the same port if requested
* @param dnprop the datanode to restart * @param dnprop the datanode to restart
@ -2256,10 +2302,21 @@ public synchronized boolean restartDataNode(DataNodeProperties dnprop,
conf.set(DFS_DATANODE_IPC_ADDRESS_KEY, conf.set(DFS_DATANODE_IPC_ADDRESS_KEY,
addr.getAddress().getHostAddress() + ":" + dnprop.ipcPort); addr.getAddress().getHostAddress() + ":" + dnprop.ipcPort);
} }
DataNode newDn = DataNode.createDataNode(args, conf, secureResources); final DataNode newDn = DataNode.createDataNode(args, conf, secureResources);
dataNodes.add(new DataNodeProperties(
newDn, newconf, args, secureResources, newDn.getIpcPort())); final DataNodeProperties dnp = new DataNodeProperties(
newDn,
newconf,
args,
secureResources,
newDn.getIpcPort());
dataNodes.add(dnp);
numDataNodes++; numDataNodes++;
setDataNodeStorageCapacities(
dataNodes.lastIndexOf(dnp),
newDn,
storageCap.toArray(new long[][]{}));
return true; return true;
} }

View File

@ -25,16 +25,25 @@
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.MiniDFSCluster.NameNodeInfo; import org.apache.hadoop.hdfs.MiniDFSCluster.NameNodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
import org.apache.hadoop.test.PathUtils; import org.apache.hadoop.test.PathUtils;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import com.google.common.base.Preconditions;
/** /**
* Tests MiniDFS cluster setup/teardown and isolation. * Tests MiniDFS cluster setup/teardown and isolation.
* Every instance is brought up with a new data dir, to ensure that * Every instance is brought up with a new data dir, to ensure that
@ -78,6 +87,116 @@ public void testClusterWithoutSystemProperties() throws Throwable {
} }
} }
/**
* Tests storage capacity setting still effective after cluster restart.
*/
@Test(timeout=100000)
public void testClusterSetStorageCapacity() throws Throwable {
final Configuration conf = new HdfsConfiguration();
final int numDatanodes = 1;
final int defaultBlockSize = 1024;
final int blocks = 100;
final int blocksSize = 1024;
final int fileLen = blocks * blocksSize;
final long capcacity = defaultBlockSize * 2 * fileLen;
final long[] capacities = new long[] {capcacity, 2 * capcacity};
final MiniDFSCluster cluster = newCluster(
conf,
numDatanodes,
capacities,
defaultBlockSize,
fileLen);
verifyStorageCapacity(cluster, capacities);
/* restart all data nodes */
cluster.restartDataNodes();
cluster.waitActive();
verifyStorageCapacity(cluster, capacities);
/* restart all name nodes */
cluster.restartNameNodes();
cluster.waitActive();
verifyStorageCapacity(cluster, capacities);
/* restart all name nodes firstly and data nodes then */
cluster.restartNameNodes();
cluster.restartDataNodes();
cluster.waitActive();
verifyStorageCapacity(cluster, capacities);
/* restart all data nodes firstly and name nodes then */
cluster.restartDataNodes();
cluster.restartNameNodes();
cluster.waitActive();
verifyStorageCapacity(cluster, capacities);
}
private void verifyStorageCapacity(
final MiniDFSCluster cluster,
final long[] capacities) throws IOException {
FsVolumeImpl source = null;
FsVolumeImpl dest = null;
/* verify capacity */
for (int i = 0; i < cluster.getDataNodes().size(); i++) {
final DataNode dnNode = cluster.getDataNodes().get(i);
try (FsDatasetSpi.FsVolumeReferences refs = dnNode.getFSDataset()
.getFsVolumeReferences()) {
source = (FsVolumeImpl) refs.get(0);
dest = (FsVolumeImpl) refs.get(1);
assertEquals(capacities[0], source.getCapacity());
assertEquals(capacities[1], dest.getCapacity());
}
}
}
private MiniDFSCluster newCluster(
final Configuration conf,
final int numDatanodes,
final long[] storageCapacities,
final int defaultBlockSize,
final int fileLen)
throws IOException, InterruptedException, TimeoutException {
conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, defaultBlockSize);
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, defaultBlockSize);
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
final String fileName = "/" + UUID.randomUUID().toString();
final Path filePath = new Path(fileName);
Preconditions.checkNotNull(storageCapacities);
Preconditions.checkArgument(
storageCapacities.length == 2,
"need to specify capacities for two storages.");
/* Write a file and restart the cluster */
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(numDatanodes)
.storageCapacities(storageCapacities)
.storageTypes(new StorageType[]{StorageType.DISK, StorageType.DISK})
.storagesPerDatanode(2)
.build();
cluster.waitActive();
final short replicationFactor = (short) 1;
final Random r = new Random();
FileSystem fs = cluster.getFileSystem(0);
DFSTestUtil.createFile(
fs,
filePath,
fileLen,
replicationFactor,
r.nextLong());
DFSTestUtil.waitReplication(fs, filePath, replicationFactor);
return cluster;
}
@Test(timeout=100000) @Test(timeout=100000)
public void testIsClusterUpAfterShutdown() throws Throwable { public void testIsClusterUpAfterShutdown() throws Throwable {
Configuration conf = new HdfsConfiguration(); Configuration conf = new HdfsConfiguration();