HDFS-1686. Federation: Add more Balancer tests with federation setting. Contributed by Bharat Viswanadham
This commit is contained in:
parent
83798f15f8
commit
850b2f2567
@ -100,6 +100,19 @@ private static class Suite {
|
|||||||
replication = 1;
|
replication = 1;
|
||||||
this.parameters = parameters;
|
this.parameters = parameters;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Suite(MiniDFSCluster cluster, final int nNameNodes, final int nDataNodes,
|
||||||
|
BalancerParameters parameters, Configuration conf, short
|
||||||
|
replicationFactor) throws IOException {
|
||||||
|
this.conf = conf;
|
||||||
|
this.cluster = cluster;
|
||||||
|
clients = new ClientProtocol[nNameNodes];
|
||||||
|
for(int i = 0; i < nNameNodes; i++) {
|
||||||
|
clients[i] = cluster.getNameNode(i).getRpcServer();
|
||||||
|
}
|
||||||
|
replication = replicationFactor;
|
||||||
|
this.parameters = parameters;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* create a file with a length of <code>fileLen</code> */
|
/* create a file with a length of <code>fileLen</code> */
|
||||||
@ -154,7 +167,7 @@ static void wait(final ClientProtocol[] clients,
|
|||||||
|
|
||||||
static void runBalancer(Suite s,
|
static void runBalancer(Suite s,
|
||||||
final long totalUsed, final long totalCapacity) throws Exception {
|
final long totalUsed, final long totalCapacity) throws Exception {
|
||||||
final double avg = totalUsed*100.0/totalCapacity;
|
double avg = totalUsed*100.0/totalCapacity;
|
||||||
|
|
||||||
LOG.info("BALANCER 0: totalUsed=" + totalUsed
|
LOG.info("BALANCER 0: totalUsed=" + totalUsed
|
||||||
+ ", totalCapacity=" + totalCapacity
|
+ ", totalCapacity=" + totalCapacity
|
||||||
@ -180,6 +193,9 @@ static void runBalancer(Suite s,
|
|||||||
for(boolean balanced = false; !balanced; i++) {
|
for(boolean balanced = false; !balanced; i++) {
|
||||||
final long[] used = new long[s.cluster.getDataNodes().size()];
|
final long[] used = new long[s.cluster.getDataNodes().size()];
|
||||||
final long[] cap = new long[used.length];
|
final long[] cap = new long[used.length];
|
||||||
|
final long[][] bpUsed = new long[s.clients.length][s.cluster
|
||||||
|
.getDataNodes().size()];
|
||||||
|
|
||||||
|
|
||||||
for(int n = 0; n < s.clients.length; n++) {
|
for(int n = 0; n < s.clients.length; n++) {
|
||||||
final DatanodeInfo[] datanodes = s.clients[n].getDatanodeReport(
|
final DatanodeInfo[] datanodes = s.clients[n].getDatanodeReport(
|
||||||
@ -199,25 +215,52 @@ static void runBalancer(Suite s,
|
|||||||
Assert.assertEquals(used[d], datanodes[d].getDfsUsed());
|
Assert.assertEquals(used[d], datanodes[d].getDfsUsed());
|
||||||
Assert.assertEquals(cap[d], datanodes[d].getCapacity());
|
Assert.assertEquals(cap[d], datanodes[d].getCapacity());
|
||||||
}
|
}
|
||||||
|
bpUsed[n][d] = datanodes[d].getBlockPoolUsed();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
balanced = true;
|
balanced = true;
|
||||||
for(int d = 0; d < used.length; d++) {
|
for(int d = 0; d < used.length; d++) {
|
||||||
final double p = used[d]*100.0/cap[d];
|
double p;
|
||||||
balanced = p <= avg + s.parameters.getThreshold();
|
if(s.parameters.getBalancingPolicy() == BalancingPolicy.Pool.INSTANCE) {
|
||||||
if (!balanced) {
|
for (int k = 0; k < s.parameters.getBlockPools().size(); k++) {
|
||||||
if (i % 100 == 0) {
|
avg = TestBalancer.sum(bpUsed[k])*100/totalCapacity;
|
||||||
LOG.warn("datanodes " + d + " is not yet balanced: "
|
p = bpUsed[k][d] * 100.0 / cap[d];
|
||||||
+ "used=" + used[d] + ", cap=" + cap[d] + ", avg=" + avg);
|
balanced = p <= avg + s.parameters.getThreshold();
|
||||||
LOG.warn("TestBalancer.sum(used)=" + TestBalancer.sum(used)
|
if (!balanced) {
|
||||||
+ ", TestBalancer.sum(cap)=" + TestBalancer.sum(cap));
|
if (i % 100 == 0) {
|
||||||
|
LOG.warn("datanodes " + d + " is not yet balanced: "
|
||||||
|
+ "block pool used=" + bpUsed[d][k] + ", cap=" + cap[d] +
|
||||||
|
", avg=" + avg);
|
||||||
|
LOG.warn("sum(blockpoolUsed)=" + TestBalancer.sum(bpUsed[k])
|
||||||
|
+ ", sum(cap)=" + TestBalancer.sum(cap));
|
||||||
|
}
|
||||||
|
sleep(100);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!balanced) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
p = used[d] * 100.0 / cap[d];
|
||||||
|
balanced = p <= avg + s.parameters.getThreshold();
|
||||||
|
if (!balanced) {
|
||||||
|
if (i % 100 == 0) {
|
||||||
|
LOG.warn("datanodes " + d + " is not yet balanced: "
|
||||||
|
+ "used=" + used[d] + ", cap=" + cap[d] + ", avg=" + avg);
|
||||||
|
LOG.warn("sum(used)=" + TestBalancer.sum(used)
|
||||||
|
+ ", sum(cap)=" + TestBalancer.sum(cap));
|
||||||
|
}
|
||||||
|
sleep(100);
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
sleep(100);
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG.info("BALANCER 6");
|
LOG.info("BALANCER 6");
|
||||||
// cluster is balanced, verify that only selected blockpools were touched
|
// cluster is balanced, verify that only selected blockpools were touched
|
||||||
Map<Integer, DatanodeStorageReport[]> postBalancerPoolUsages =
|
Map<Integer, DatanodeStorageReport[]> postBalancerPoolUsages =
|
||||||
@ -425,19 +468,26 @@ private void unevenDistribution(final int nNameNodes,
|
|||||||
* It then adds an empty node and start balancing.
|
* It then adds an empty node and start balancing.
|
||||||
*
|
*
|
||||||
* @param nNameNodes Number of NameNodes
|
* @param nNameNodes Number of NameNodes
|
||||||
* @param capacities Capacities of the datanodes
|
|
||||||
* @param racks Rack names
|
* @param racks Rack names
|
||||||
* @param newCapacity the capacity of the new DataNode
|
|
||||||
* @param newRack the rack for the new DataNode
|
* @param newRack the rack for the new DataNode
|
||||||
* @param conf Configuration
|
* @param conf Configuration
|
||||||
|
* @param nNameNodestoBalance noOfNameNodestoBalance
|
||||||
|
* @param balancerParameters BalancerParameters
|
||||||
*/
|
*/
|
||||||
private void runTest(final int nNameNodes, long[] capacities, String[] racks,
|
private void runTest(final int nNameNodes, String[] racks,
|
||||||
long newCapacity, String newRack, Configuration conf) throws Exception {
|
String[] newRack, Configuration conf,
|
||||||
final int nDataNodes = capacities.length;
|
int nNameNodestoBalance,
|
||||||
|
BalancerParameters balancerParameters)
|
||||||
|
throws Exception {
|
||||||
|
final int nDataNodes = racks.length;
|
||||||
|
final long[] capacities = new long[nDataNodes];
|
||||||
|
Arrays.fill(capacities, CAPACITY);
|
||||||
LOG.info("nNameNodes=" + nNameNodes + ", nDataNodes=" + nDataNodes);
|
LOG.info("nNameNodes=" + nNameNodes + ", nDataNodes=" + nDataNodes);
|
||||||
Assert.assertEquals(nDataNodes, racks.length);
|
Assert.assertEquals(nDataNodes, racks.length);
|
||||||
|
|
||||||
LOG.info("RUN_TEST -1");
|
LOG.info("RUN_TEST -1: start a cluster with nNameNodes=" + nNameNodes
|
||||||
|
+ ", nDataNodes=" + nDataNodes);
|
||||||
|
|
||||||
final MiniDFSCluster cluster = new MiniDFSCluster
|
final MiniDFSCluster cluster = new MiniDFSCluster
|
||||||
.Builder(new Configuration(conf))
|
.Builder(new Configuration(conf))
|
||||||
.nnTopology(MiniDFSNNTopology.simpleFederatedTopology(nNameNodes))
|
.nnTopology(MiniDFSNNTopology.simpleFederatedTopology(nNameNodes))
|
||||||
@ -451,44 +501,63 @@ private void runTest(final int nNameNodes, long[] capacities, String[] racks,
|
|||||||
try {
|
try {
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
LOG.info("RUN_TEST 1");
|
LOG.info("RUN_TEST 1");
|
||||||
final Suite s =
|
|
||||||
new Suite(cluster, nNameNodes, nDataNodes,
|
Suite s;
|
||||||
BalancerParameters.DEFAULT, conf);
|
|
||||||
|
Set<String> blockpools = new HashSet<>();
|
||||||
|
if(balancerParameters == null) {
|
||||||
|
s = new Suite(cluster, nNameNodes, nDataNodes,
|
||||||
|
BalancerParameters.DEFAULT, conf);
|
||||||
|
|
||||||
|
} else {
|
||||||
|
for (int i=0; i< nNameNodestoBalance; i++) {
|
||||||
|
blockpools.add(cluster.getNamesystem(i).getBlockPoolId());
|
||||||
|
}
|
||||||
|
BalancerParameters.Builder b =
|
||||||
|
new BalancerParameters.Builder();
|
||||||
|
b.setBalancingPolicy(balancerParameters.getBalancingPolicy());
|
||||||
|
b.setBlockpools(blockpools);
|
||||||
|
BalancerParameters params = b.build();
|
||||||
|
s = new Suite(cluster, nNameNodes, nDataNodes, params, conf, (short)2);
|
||||||
|
}
|
||||||
long totalCapacity = TestBalancer.sum(capacities);
|
long totalCapacity = TestBalancer.sum(capacities);
|
||||||
|
|
||||||
LOG.info("RUN_TEST 2");
|
LOG.info("RUN_TEST 2: create files");
|
||||||
// fill up the cluster to be 30% full
|
// fill up the cluster to be 30% full
|
||||||
final long totalUsed = totalCapacity*3/10;
|
final long totalUsed = (totalCapacity * s.replication)*3/10;
|
||||||
final long size = (totalUsed/nNameNodes)/s.replication;
|
final long size = (totalUsed/nNameNodes)/s.replication;
|
||||||
for(int n = 0; n < nNameNodes; n++) {
|
for(int n = 0; n < nNameNodes; n++) {
|
||||||
createFile(s, n, size);
|
createFile(s, n, size);
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG.info("RUN_TEST 3");
|
LOG.info("RUN_TEST 3: " + newRack.length + " new datanodes");
|
||||||
// start up an empty node with the same capacity and on the same rack
|
// start up an empty node with the same capacity and on the same rack
|
||||||
cluster.startDataNodes(conf, 1, true, null,
|
final long[] newCapacity = new long[newRack.length];
|
||||||
new String[]{newRack}, new long[]{newCapacity});
|
Arrays.fill(newCapacity, CAPACITY);
|
||||||
|
cluster.startDataNodes(conf, newCapacity.length, true, null,
|
||||||
|
newRack, newCapacity);
|
||||||
|
|
||||||
totalCapacity += newCapacity;
|
totalCapacity += TestBalancer.sum(newCapacity);
|
||||||
|
|
||||||
LOG.info("RUN_TEST 4");
|
LOG.info("RUN_TEST 4: run Balancer");
|
||||||
// run RUN_TEST and validate results
|
// run RUN_TEST and validate results
|
||||||
runBalancer(s, totalUsed, totalCapacity);
|
runBalancer(s, totalUsed, totalCapacity);
|
||||||
LOG.info("RUN_TEST 5");
|
LOG.info("RUN_TEST 5");
|
||||||
} finally {
|
} finally {
|
||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
}
|
}
|
||||||
LOG.info("RUN_TEST 6");
|
LOG.info("RUN_TEST 6: done");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/** Test a cluster with even distribution,
|
/** Test a cluster with even distribution,
|
||||||
* then a new empty node is added to the cluster
|
* then a new empty node is added to the cluster.
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testBalancer() throws Exception {
|
public void testTwoOneOne() throws Exception {
|
||||||
final Configuration conf = createConf();
|
final Configuration conf = createConf();
|
||||||
runTest(2, new long[]{CAPACITY}, new String[]{RACK0},
|
runTest(2, new String[]{RACK0}, new String[] {RACK0}, conf,
|
||||||
CAPACITY/2, RACK0, conf);
|
2, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Test unevenly distributed cluster */
|
/** Test unevenly distributed cluster */
|
||||||
@ -517,4 +586,36 @@ public void testBalancing2OutOf3Blockpools() throws Exception {
|
|||||||
5 * CAPACITY / 100, 10 * CAPACITY / 100 }, new long[] { CAPACITY,
|
5 * CAPACITY / 100, 10 * CAPACITY / 100 }, new long[] { CAPACITY,
|
||||||
CAPACITY, CAPACITY }, new String[] { RACK0, RACK1, RACK2 }, conf);
|
CAPACITY, CAPACITY }, new String[] { RACK0, RACK1, RACK2 }, conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Even distribution with 2 Namenodes, 4 Datanodes and 2 new Datanodes. */
|
||||||
|
@Test(timeout = 600000)
|
||||||
|
public void testTwoFourTwo() throws Exception {
|
||||||
|
final Configuration conf = createConf();
|
||||||
|
runTest(2, new String[]{RACK0, RACK0, RACK1, RACK1},
|
||||||
|
new String[]{RACK2, RACK2}, conf, 2, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout=600000)
|
||||||
|
public void testBalancingBlockpoolsWithBlockPoolPolicy() throws Exception {
|
||||||
|
final Configuration conf = createConf();
|
||||||
|
BalancerParameters balancerParameters = new BalancerParameters.Builder()
|
||||||
|
.setBalancingPolicy(BalancingPolicy.Pool.INSTANCE).build();
|
||||||
|
runTest(2, new String[]{RACK0, RACK0, RACK1, RACK1},
|
||||||
|
new String[]{RACK2, RACK2}, conf, 2,
|
||||||
|
balancerParameters);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 600000)
|
||||||
|
public void test1OutOf2BlockpoolsWithBlockPoolPolicy()
|
||||||
|
throws
|
||||||
|
Exception {
|
||||||
|
final Configuration conf = createConf();
|
||||||
|
BalancerParameters balancerParameters = new BalancerParameters.Builder()
|
||||||
|
.setBalancingPolicy(BalancingPolicy.Pool.INSTANCE).build();
|
||||||
|
runTest(2, new String[]{RACK0, RACK0, RACK1, RACK1},
|
||||||
|
new String[]{RACK2, RACK2}, conf, 1,
|
||||||
|
balancerParameters);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user